From a85b06ce0ee6c97e0ba9225d16e48f8aeca894a2 Mon Sep 17 00:00:00 2001 From: Patrick Cleeve Date: Fri, 21 Feb 2025 12:48:06 +1100 Subject: [PATCH 1/2] [feat] add milling structures, functions, manager --- .github/workflows/unit_tests_cloud.yml | 7 + src/odemis/acq/drift/__init__.py | 29 +++ src/odemis/acq/milling/milling_tasks.yaml | 90 +++++++ src/odemis/acq/milling/millmng.py | 221 +++++++++++++++++ src/odemis/acq/milling/patterns.py | 248 +++++++++++++++++++ src/odemis/acq/milling/plotting.py | 153 ++++++++++++ src/odemis/acq/milling/tasks.py | 115 +++++++++ src/odemis/acq/milling/test/patterns_test.py | 229 +++++++++++++++++ src/odemis/acq/milling/test/tasks_test.py | 153 ++++++++++++ src/odemis/driver/autoscript_client.py | 3 +- 10 files changed, 1247 insertions(+), 1 deletion(-) create mode 100644 src/odemis/acq/milling/milling_tasks.yaml create mode 100644 src/odemis/acq/milling/millmng.py create mode 100644 src/odemis/acq/milling/patterns.py create mode 100644 src/odemis/acq/milling/plotting.py create mode 100644 src/odemis/acq/milling/tasks.py create mode 100644 src/odemis/acq/milling/test/patterns_test.py create mode 100644 src/odemis/acq/milling/test/tasks_test.py diff --git a/.github/workflows/unit_tests_cloud.yml b/.github/workflows/unit_tests_cloud.yml index 53127099f4..84a29465a6 100644 --- a/.github/workflows/unit_tests_cloud.yml +++ b/.github/workflows/unit_tests_cloud.yml @@ -167,6 +167,13 @@ jobs: export PYTHONPATH="$PWD/src:$PYTHONPATH" python3 -m unittest src/odemis/acq/test/feature_test.py --verbose + - name: Run tests from odemis.acq.milling + if: ${{ !cancelled() }} + run: | + export PYTHONPATH="$PWD/src:$PYTHONPATH" + python3 -m unittest src/odemis/acq/milling/test/patterns_test.py --verbose + python3 -m unittest src/odemis/acq/milling/test/tasks_test.py --verbose + - name: Run tests from odemis.acq.leech if: ${{ !cancelled() }} run: | diff --git a/src/odemis/acq/drift/__init__.py b/src/odemis/acq/drift/__init__.py index 4267e98089..52a5236026 100644 --- a/src/odemis/acq/drift/__init__.py +++ b/src/odemis/acq/drift/__init__.py @@ -28,6 +28,7 @@ import numpy import cv2 +from odemis import model from odemis.acq.align.shift import MeasureShift MIN_RESOLUTION = (20, 20) # sometimes 8x8 works, but it's not reliable enough @@ -347,3 +348,31 @@ def GuessAnchorRegion(whole_img, sample_region): (occurrences[0, 1] + (dc_shape[1] / 2)) / whole_img.shape[1]) return anchor_roi + +def align_reference_image( + ref_image: model.DataArray, + new_image: model.DataArray, + scanner: model.Emitter +) -> None: + """Align the new image to the reference image using beam shift. + Only supports 2D images with the same resolution. + :param ref_image: The reference image to align with. + :param new_image: The new image to align to the reference. + :param scanner: The scanner to align with. + :return: None + """ + if (ref_image.ndim != 2 or new_image.ndim != 2 or ref_image.shape != new_image.shape): + raise ValueError(f"Only equally sized 2D images are supported for alignment. {ref_image.shape}, {new_image.shape}") + + if ref_image.metadata[model.MD_PIXEL_SIZE] != new_image.metadata[model.MD_PIXEL_SIZE]: + raise ValueError("The images must have the same pixel size.") + + shift_px = MeasureShift(ref_image, new_image, 2) + + pixelsize = ref_image.metadata[model.MD_PIXEL_SIZE] + shift_m = (shift_px[0] * pixelsize[0], shift_px[1] * pixelsize[1]) + + previous_shift = scanner.shift.value + shift = (shift_m[0] + previous_shift[0], shift_m[1] + previous_shift[1]) # m + scanner.shift.value = shift + logging.debug(f"reference image alignment: previous: {previous_shift}, calculated shift: {shift_m}, beam shift: {scanner.shift.value}") diff --git a/src/odemis/acq/milling/milling_tasks.yaml b/src/odemis/acq/milling/milling_tasks.yaml new file mode 100644 index 0000000000..8a7dcc02e2 --- /dev/null +++ b/src/odemis/acq/milling/milling_tasks.yaml @@ -0,0 +1,90 @@ +# Contains the configuration for the milling tasks in the format: +# +# task_name: +# milling: Milling parameters +# patterns: List of patterns (and parameters) to mill +'Rough Milling 01': + name: 'Rough Milling 01' + milling: + current: 1.0e-9 + voltage: 30000.0 + field_of_view: 8e-05 + mode: 'Serial' + channel: 'ion' + patterns: + - name: 'Rough Trench' + width: 1.0e-05 + height: 6.0e-06 + depth: 1.0e-06 + spacing: 3.0e-06 + center_x: 0 + center_y: 0 + pattern: 'trench' +'Rough Milling 02': + name: 'Rough Milling 02' + milling: + current: 0.2e-9 + voltage: 30000.0 + field_of_view: 8e-05 + mode: 'Serial' + channel: 'ion' + patterns: + - name: 'Rough Trench 02' + width: 9.5e-6 + height: 4.0e-06 + depth: 0.8e-06 + spacing: 1.5e-06 + center_x: 0 + center_y: 0 + pattern: 'trench' +'Polishing 01': + name: 'Polishing 01' + milling: + current: 60.0e-12 + voltage: 30000.0 + field_of_view: 8e-05 + mode: 'Serial' + channel: 'ion' + patterns: + - name: 'Polishing Trench 01' + width: 9.0e-06 + height: 1.0e-06 + depth: 0.6e-06 + spacing: 600.0e-09 + center_x: 0 + center_y: 0 + pattern: 'trench' +'Polishing 02': + name: 'Polishing 02' + milling: + current: 60.0e-12 + voltage: 30000.0 + field_of_view: 8e-05 + mode: 'Serial' + channel: 'ion' + patterns: + - name: 'Polishing Trench 02' + width: 8.5e-06 + height: 0.6e-06 + depth: 0.5e-06 + spacing: 300.0e-09 + center_x: 0 + center_y: 0 + pattern: 'trench' +'Microexpansion': + name: 'Microexpansion' + milling: + current: 1.0e-9 + voltage: 30000.0 + field_of_view: 8e-05 + mode: 'Serial' + channel: 'ion' + patterns: + - name: 'Microexpansion' + width: 0.5e-06 + height: 1.5e-05 + depth: 1.0e-06 + spacing: 1.0e-05 + center_x: 0 + center_y: 0 + pattern: 'microexpansion' diff --git a/src/odemis/acq/milling/millmng.py b/src/odemis/acq/milling/millmng.py new file mode 100644 index 0000000000..92dfb9966e --- /dev/null +++ b/src/odemis/acq/milling/millmng.py @@ -0,0 +1,221 @@ +# -*- coding: utf-8 -*- +""" +@author: Patrick Cleeve + +Copyright © 2025 Delmic + +This file is part of Odemis. + +Odemis is free software: you can redistribute it and/or modify it under the +terms of the GNU General Public License version 2 as published by the Free +Software Foundation. + +Odemis is distributed in the hope that it will be useful, but WITHOUT ANY +WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A +PARTICULAR PURPOSE. See the GNU General Public License for more details. + +You should have received a copy of the GNU General Public License along with +Odemis. If not, see http://www.gnu.org/licenses/. + + +### Purpose ### + +This module contains classes to control the actions related to the milling. + +""" +import logging +import threading +import time +from concurrent.futures._base import ( + CANCELLED, + FINISHED, + RUNNING, + CancelledError, + Future, +) +from typing import List + +from odemis import model +from odemis.acq.acqmng import acquire +from odemis.acq.drift import align_reference_image +from odemis.acq.milling.patterns import ( + RectanglePatternParameters, +) +from odemis.acq.milling.tasks import MillingTaskSettings +from odemis.acq.stream import FIBStream +from odemis.util import executeAsyncTask + + +class TFSMillingTaskManager: + """This class manages running milling tasks.""" + + def __init__(self, future: Future, tasks: List[MillingTaskSettings], fib_stream: FIBStream): + """ + :param future: the future that will be executing the task + :param tasks: The milling tasks to run (in order) + """ + + self.fibsem = model.getComponent(role="fibsem") + self.tasks = tasks + + # for reference image alignment + self.fib_stream = fib_stream + + self._future = future + if future is not None: + self._future.running_subf = model.InstantaneousFuture() + self._future._task_lock = threading.Lock() + + def cancel(self, future: Future) -> bool: + """ + Canceler of acquisition task. + :param future: the future that will be executing the task + :return: True if it successfully cancelled (stopped) the future + """ + logging.debug("Canceling milling procedure...") + + with future._task_lock: + if future._task_state == FINISHED: + return False + future._task_state = CANCELLED + future.running_subf.cancel() + self.fibsem.stop_milling() + logging.debug("Milling procedure cancelled.") + return True + + def estimate_milling_time(self) -> float: + """ + Estimates the milling time for the given patterns. + :return: (float > 0): the estimated time is in seconds + """ + return self.fibsem.estimate_milling_time() + + def run_milling(self, settings: MillingTaskSettings): + """Run the milling task with the given settings. ThermoFisher implementation""" + + # get the milling settings + milling_current = settings.milling.current.value + milling_voltage = settings.milling.voltage.value + milling_fov = settings.milling.field_of_view.value + milling_channel = settings.milling.channel.value + milling_mode = settings.milling.mode.value + align_at_milling_current = settings.milling.align.value + + # get initial imaging settings + imaging_current = self.fibsem.get_beam_current(milling_channel) + imaging_voltage = self.fibsem.get_high_voltage(milling_channel) + imaging_fov = self.fibsem.get_field_of_view(milling_channel) + + try: + + # acquire a reference image at the imaging settings + if align_at_milling_current: + self._future.running_subf = acquire([self.fib_stream]) + data, _ = self._future.running_subf.result() + ref_image = data[0] + + # set the milling state + self.fibsem.clear_patterns() + self.fibsem.set_default_patterning_beam_type(milling_channel) + self.fibsem.set_high_voltage(milling_voltage, milling_channel) + self.fibsem.set_beam_current(milling_current, milling_channel) + # self.fibsem.set_field_of_view(milling_fov, milling_channel) # tmp: disable until matched in gui + self.fibsem.set_patterning_mode(milling_mode) + + # acquire a new image at the milling settings and align + if align_at_milling_current: + self._future.running_subf = acquire([self.fib_stream]) + data, _ = self._future.running_subf.result() + new_image = data[0] + align_reference_image(ref_image, new_image, self.ion_beam) + + # draw milling patterns to microscope + for pattern in settings.generate(): + if isinstance(pattern, RectanglePatternParameters): + self.fibsem.create_rectangle(pattern.to_dict()) + else: + raise NotImplementedError(f"Pattern {pattern} not supported") # TODO: support other patterns + + # estimate the milling time + estimated_time = self.fibsem.estimate_milling_time() + self._future.set_end_time(time.time() + estimated_time) + + # start patterning (async) + self.fibsem.start_milling() + + # wait for milling to finish + elapsed_time = 0 + wait_time = 5 + while self.fibsem.get_patterning_state() == "Running": + + with self._future._task_lock: + if self._future.cancelled() == CANCELLED: + raise CancelledError() + + logging.debug(f"Milling in progress... elapsed time: {elapsed_time} s, estimated time: {estimated_time} s") + time.sleep(wait_time) + elapsed_time += wait_time + + except CancelledError as ce: + logging.debug(f"Cancelled milling: {ce}") + raise + except Exception as e: + logging.exception(f"Error while milling: {e}") + raise + finally: + # restore imaging state + self.fibsem.set_beam_current(imaging_current, milling_channel) + self.fibsem.set_high_voltage(imaging_voltage, milling_channel) + self.fibsem.set_field_of_view(imaging_fov, milling_channel) + self.fibsem.clear_patterns() + return + + def run(self): + """ + The main function of the task class, which will be called by the future asynchronously + """ + self._future._task_state = RUNNING + + try: + for task in self.tasks: + + with self._future._task_lock: + if self._future._task_state == CANCELLED: + raise CancelledError() + + logging.debug(f"Running milling task: {task.name}") + + self.run_milling(task) + logging.debug("The milling completed") + + except CancelledError: + logging.debug("Stopping because milling was cancelled") + raise + except Exception: + logging.warning("The milling failed") + raise + finally: + self._future._task_state = FINISHED + + +# TODO: replace with run_milling_tasks_openfibsem +def run_milling_tasks(tasks: List[MillingTaskSettings], fib_stream: FIBStream) -> Future: + """ + Run multiple milling tasks in order. + :param tasks: List of milling tasks to be executed in order. + :return: ProgressiveFuture + """ + # Create a progressive future with running sub future + future = model.ProgressiveFuture() + # create acquisition task + milling_task_manager = TFSMillingTaskManager(future, tasks, fib_stream) + # add the ability of cancelling the future during execution + future.task_canceller = milling_task_manager.cancel + + # set the progress of the future (TODO: fix dummy time estimate) + future.set_end_time(time.time() + 10 * len(tasks)) + + # assign the acquisition task to the future + executeAsyncTask(future, milling_task_manager.run) + + return future diff --git a/src/odemis/acq/milling/patterns.py b/src/odemis/acq/milling/patterns.py new file mode 100644 index 0000000000..f7f99225da --- /dev/null +++ b/src/odemis/acq/milling/patterns.py @@ -0,0 +1,248 @@ +""" +@author: Patrick Cleeve + +Copyright © 2025 Delmic + +This file is part of Odemis. + +Odemis is free software: you can redistribute it and/or modify it under the +terms of the GNU General Public License version 2 as published by the Free +Software Foundation. + +Odemis is distributed in the hope that it will be useful, but WITHOUT ANY +WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A +PARTICULAR PURPOSE. See the GNU General Public License for more details. + +You should have received a copy of the GNU General Public License along with +Odemis. If not, see http://www.gnu.org/licenses/. + + +### Purpose ### + +This module contains structures to define milling patterns. + +""" + +import math +from abc import ABC, abstractmethod +from typing import List + +from odemis import model + +class MillingPatternParameters(ABC): + """Represents milling pattern parameters""" + + def __init__(self, name: str): + self.name = model.StringVA(name) + + @abstractmethod + def to_dict(self) -> dict: + pass + + @staticmethod + @abstractmethod + def from_dict(data: dict): + pass + + def __repr__(self): + return f"{self.to_dict()}" + + @abstractmethod + def generate(self) -> List['MillingPatternParameters']: + """generate the milling pattern for the microscope""" + pass + + +class RectanglePatternParameters(MillingPatternParameters): + """Represents rectangle pattern parameters""" + + def __init__(self, width: float, height: float, depth: float, rotation: float = 0.0, center = (0, 0), scan_direction: str = "TopToBottom", name: str = "Rectangle"): + self.name = model.StringVA(name) + self.width = model.FloatContinuous(width, unit="m", range=(1e-9, 900e-6)) + self.height = model.FloatContinuous(height, unit="m", range=(1e-9, 900e-6)) + self.depth = model.FloatContinuous(depth, unit="m", range=(1e-9, 100e-6)) + self.rotation = model.FloatContinuous(rotation, unit="rad", range=(0, 2 * math.pi)) + self.center = model.TupleContinuous(center, unit="m", range=((-1e3, -1e3), (1e3, 1e3)), cls=(int, float)) + self.scan_direction = model.StringEnumerated(scan_direction, choices=set(["TopToBottom", "BottomToTop", "LeftToRight", "RightToLeft"])) + + def to_dict(self) -> dict: + """Convert the parameters to a json object""" + return {"name": self.name.value, + "width": self.width.value, + "height": self.height.value, + "depth": self.depth.value, + "rotation": self.rotation.value, + "center_x": self.center.value[0], + "center_y": self.center.value[1], + "scan_direction": self.scan_direction.value, + "pattern": "rectangle" + } + + @staticmethod + def from_dict(data: dict) -> 'RectanglePatternParameters': + """Create a RectanglePatternParameters object from a json object""" + return RectanglePatternParameters(width=data["width"], + height=data["height"], + depth=data["depth"], + rotation=data.get("rotation", 0), + center=(data.get("center_x", 0), data.get("center_y", 0)), + scan_direction=data.get("scan_direction", "TopToBottom"), + name=data.get("name", "Rectangle")) + + def __repr__(self) -> str: + return f"{self.to_dict()}" + + def generate(self) -> List[MillingPatternParameters]: + """Generate a list of milling shapes for the microscope. + Note: the rectangle is a pattern that is always generated as a single shape""" + return [self] + +class TrenchPatternParameters(MillingPatternParameters): + """Represents trench pattern parameters""" + + def __init__(self, width: float, height: float, depth: float, spacing: float, center = (0, 0), name: str = "Trench"): + self.name = model.StringVA(name) + self.width = model.FloatContinuous(width, unit="m", range=(1e-9, 900e-6)) + self.height = model.FloatContinuous(height, unit="m", range=(1e-9, 900e-6)) + self.depth = model.FloatContinuous(depth, unit="m", range=(1e-9, 100e-6)) + self.spacing = model.FloatContinuous(spacing, unit="m", range=(1e-9, 900e-6)) + self.center = model.TupleContinuous(center, unit="m", range=((-1e3, -1e3), (1e3, 1e3)), cls=(int, float)) + + def to_dict(self) -> dict: + """Convert the parameters to a json object""" + return {"name": self.name.value, + "width": self.width.value, + "height": self.height.value, + "depth": self.depth.value, + "spacing": self.spacing.value, + "center_x": self.center.value[0], + "center_y": self.center.value[1], + "pattern": "trench" + } + + @staticmethod + def from_dict(data: dict) -> 'TrenchPatternParameters': + """Create a TrenchPatternParameters object from a json object""" + return TrenchPatternParameters(width=data["width"], + height=data["height"], + depth=data["depth"], + spacing=data["spacing"], + center=(data.get("center_x", 0), data.get("center_y", 0)), + name=data.get("name", "Trench")) + + def __repr__(self) -> str: + return f"{self.to_dict()}" + + def generate(self) -> List[MillingPatternParameters]: + """Generate a list of milling shapes for the microscope""" + name = self.name.value + width = self.width.value + height = self.height.value + depth = self.depth.value + spacing = self.spacing.value + center = self.center.value + + # pattern center + center_x = center[0] + upper_center_y = center[1] + (height / 2 + spacing / 2) + lower_center_y = center[1] - (height / 2 + spacing / 2) + + patterns = [ + RectanglePatternParameters( + name=f"{name} (Upper)", + width=width, + height=height, + depth=depth, + rotation=0, + center = (center_x, upper_center_y), # x, y + scan_direction="TopToBottom", + ), + RectanglePatternParameters( + name=f"{name} (Lower)", + width=width, + height=height, + depth=depth, + rotation=0, + center = (center_x, lower_center_y), # x, y + scan_direction="BottomToTop", + ), + ] + + return patterns + + +class MicroexpansionPatternParameters(MillingPatternParameters): + """Represents microexpansion pattern parameters""" + + def __init__(self, width: float, height: float, depth: float, spacing: float, center = (0, 0), name: str = "Trench"): + self.name = model.StringVA(name) + self.width = model.FloatContinuous(width, unit="m", range=(1e-9, 900e-6)) + self.height = model.FloatContinuous(height, unit="m", range=(1e-9, 900e-6)) + self.depth = model.FloatContinuous(depth, unit="m", range=(1e-9, 100e-6)) + self.spacing = model.FloatContinuous(spacing, unit="m", range=(1e-9, 900e-6)) + self.center = model.TupleContinuous(center, unit="m", range=((-1e3, -1e3), (1e3, 1e3)), cls=(int, float)) + + def to_dict(self) -> dict: + """Convert the parameters to a json object""" + return {"name": self.name.value, + "width": self.width.value, + "height": self.height.value, + "depth": self.depth.value, + "spacing": self.spacing.value, + "center_x": self.center.value[0], + "center_y": self.center.value[1], + "pattern": "microexpansion" + } + + @staticmethod + def from_dict(data: dict) -> 'MicroexpansionPatternParameters': + """Create a MicroexpansionPatternParameters object from a json object""" + return MicroexpansionPatternParameters( + width=data["width"], + height=data["height"], + depth=data["depth"], + spacing=data["spacing"], + center=(data.get("center_x", 0), data.get("center_y", 0)), + name=data.get("name", "Microexpansion")) + + def __repr__(self) -> str: + return f"{self.to_dict()}" + + def generate(self) -> List[MillingPatternParameters]: + """Generate a list of milling shapes for the microscope""" + name = self.name.value + width = self.width.value + height = self.height.value + depth = self.depth.value + spacing = self.spacing.value + center_x, center_y = self.center.value + + patterns = [ + RectanglePatternParameters( + name=f"{name} (Left)", + width=width, + height=height, + depth=depth, + rotation=0, + center = (center_x - spacing, center_y), + scan_direction="TopToBottom", + ), + RectanglePatternParameters( + name=f"{name} (Right)", + width=width, + height=height, + depth=depth, + rotation=0, + center = (center_x + spacing, center_y), + scan_direction="TopToBottom", + ), + ] + + return patterns + +# dictionary to map pattern names to pattern classes +pattern_generator = { + "rectangle": RectanglePatternParameters, + "trench": TrenchPatternParameters, + "microexpansion": MicroexpansionPatternParameters, +} diff --git a/src/odemis/acq/milling/plotting.py b/src/odemis/acq/milling/plotting.py new file mode 100644 index 0000000000..23f14c7343 --- /dev/null +++ b/src/odemis/acq/milling/plotting.py @@ -0,0 +1,153 @@ +""" +@author: Patrick Cleeve + +Copyright © 2025 Delmic + +This file is part of Odemis. + +Odemis is free software: you can redistribute it and/or modify it under the +terms of the GNU General Public License version 2 as published by the Free +Software Foundation. + +Odemis is distributed in the hope that it will be useful, but WITHOUT ANY +WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A +PARTICULAR PURPOSE. See the GNU General Public License for more details. + +You should have received a copy of the GNU General Public License along with +Odemis. If not, see http://www.gnu.org/licenses/. + + +### Purpose ### + +This module contains plotting utils related to milling tasks. + +""" +from typing import Dict, List + +import matplotlib.patches as mpatches +import matplotlib.pyplot as plt +from odemis import model +from odemis.acq.milling.patterns import ( + MicroexpansionPatternParameters, + RectanglePatternParameters, + TrenchPatternParameters, +) +from odemis.acq.milling.tasks import MillingTaskSettings + +# milling pattern colours +COLOURS = [ + "yellow","cyan", "magenta", "lime", + "orange","hotpink", "green", "blue", + "red", "purple", +] + +def _draw_trench_pattern(image: model.DataArray, params: TrenchPatternParameters, colour: str = "yellow", name: str = "Task") -> List[mpatches.Rectangle]: + # get parameters + width = params.width.value + height = params.height.value + spacing = params.spacing.value + mx, my = params.center.value + + # position in metres from image centre + pixel_size = image.metadata[model.MD_PIXEL_SIZE][0] # assume isotropic + pmx, pmy = mx / pixel_size, my / pixel_size + + # convert to image coordinates + cy, cx = image.shape[0] // 2, image.shape[1] // 2 + px = cx + pmx + py = cy - pmy + + # convert parameters to pixels + width = width / pixel_size + height = height / pixel_size + spacing = spacing / pixel_size + + rect1 = mpatches.Rectangle((px-width/2, py+spacing/2), width=width, height=height, linewidth=1, edgecolor=colour, facecolor=colour, alpha=0.3, label=f"{name}") + rect2 = mpatches.Rectangle((px-width/2, py-spacing/2-height), width=width, height=height, linewidth=1, edgecolor=colour, facecolor=colour, alpha=0.3) + + return [rect1, rect2] + + +def _draw_rectangle_pattern(image: model.DataArray, params: RectanglePatternParameters, colour: str = "yellow", name: str = "Task") -> List[mpatches.Rectangle]: + # get parameters + width = params.width.value + height = params.height.value + mx, my = params.center.value + + # position in metres from image centre + pixel_size = image.metadata[model.MD_PIXEL_SIZE][0] # assume isotropic + pmx, pmy = mx / pixel_size, my / pixel_size + + # convert to image coordinates + cy, cx = image.shape[0] // 2, image.shape[1] // 2 + px = cx + pmx + py = cy - pmy + + # convert parameters to pixels + width = width / pixel_size + height = height / pixel_size + + rect = mpatches.Rectangle((px-width/2, py+height/2), width=width, height=height, linewidth=1, edgecolor=colour, facecolor=colour, alpha=0.3, label=f"{name}") + + return [rect] + +def _draw_microexpansion_pattern(image: model.DataArray, params: MicroexpansionPatternParameters, colour: str = "yellow", name: str = "Task") -> List[mpatches.Rectangle]: + + # get parameters + width = params.width.value + height = params.height.value + spacing = params.spacing.value + mx, my = params.center.value + + # position in metres from image centre + pixel_size = image.metadata[model.MD_PIXEL_SIZE][0] # assume isotropic + pmx, pmy = mx / pixel_size, my / pixel_size + + # convert to image coordinates + cy, cx = image.shape[0] // 2, image.shape[1] // 2 + px = cx + pmx + py = cy - pmy + + # convert parameters to pixels + width = width / pixel_size + height = height / pixel_size + spacing = spacing / pixel_size + + rect1 = mpatches.Rectangle((px-spacing, py-height/2), width=width, height=height, linewidth=1, edgecolor=colour, facecolor=colour, alpha=0.3, label=f"{name}") + rect2 = mpatches.Rectangle((px+spacing-width/2, py-height/2), width=width, height=height, linewidth=1, edgecolor=colour, facecolor=colour, alpha=0.3) + + return [rect1, rect2] + +drawing_functions = { + RectanglePatternParameters: _draw_rectangle_pattern, + TrenchPatternParameters: _draw_trench_pattern, + MicroexpansionPatternParameters: _draw_microexpansion_pattern, + +} + +def draw_milling_tasks(image: model.DataArray, milling_tasks: Dict[str, MillingTaskSettings], title="Milling Tasks") -> plt.Figure: + """Draw the milling tasks on the given image using matplotlib. The patterns are drawn in different colours for each task. + This is primarily for debugging and visualisation purposes. + :param image: the image to draw the patterns on + :param milling_tasks: the milling tasks to draw + :return: the figure containing the image and patterns + """ + fig, ax = plt.subplots(1, 1, figsize=(10, 10)) + plt.imshow(image, cmap="gray") + + for i, (task_name, task) in enumerate(milling_tasks.items()): + + colour = COLOURS[i%len(COLOURS)] + for p in task.patterns: + patches = [] + + patches = drawing_functions[type(p)](image, p, colour=colour, name=task_name) + + for patch in patches: + ax.add_patch(patch) + plt.legend() + + # set title + ax.set_title(title) + + return fig diff --git a/src/odemis/acq/milling/tasks.py b/src/odemis/acq/milling/tasks.py new file mode 100644 index 0000000000..17b80dc75a --- /dev/null +++ b/src/odemis/acq/milling/tasks.py @@ -0,0 +1,115 @@ +""" +@author: Patrick Cleeve + +Copyright © 2025 Delmic + +This file is part of Odemis. + +Odemis is free software: you can redistribute it and/or modify it under the +terms of the GNU General Public License version 2 as published by the Free +Software Foundation. + +Odemis is distributed in the hope that it will be useful, but WITHOUT ANY +WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A +PARTICULAR PURPOSE. See the GNU General Public License for more details. + +You should have received a copy of the GNU General Public License along with +Odemis. If not, see http://www.gnu.org/licenses/. + + +### Purpose ### + +This module contains structures to define milling tasks and parameters. + +""" + +import os +from typing import Dict, List + +import yaml +from odemis import model +from odemis.acq.milling.patterns import ( + MillingPatternParameters, + pattern_generator, +) + + +class MillingSettings: + """Represents milling settings for a single milling task""" + + def __init__(self, current: float, voltage: float, field_of_view: float, mode: str = "Serial", channel: str = "ion", align: bool = True): + self.current = model.FloatContinuous(current, unit="A", range=(20e-12, 120e-9)) + self.voltage = model.FloatContinuous(voltage, unit="V", range=(0, 30e3)) + self.field_of_view = model.FloatContinuous(field_of_view, unit="m", range=(50e-06, 960e-06)) + self.mode = model.StringEnumerated(mode, choices=set(["Serial", "Parallel"])) + self.channel = model.StringEnumerated(channel, choices=set(["ion"])) + self.align = model.BooleanVA(align) # align at the milling current + + def to_dict(self) -> dict: + return {"current": self.current.value, + "voltage": self.voltage.value, + "field_of_view": self.field_of_view.value, + "mode": self.mode.value, + "channel": self.channel.value, + "align": self.align.value} + + @staticmethod + def from_dict(data: dict) -> "MillingSettings": + return MillingSettings(current=data["current"], + voltage=data["voltage"], + field_of_view=data["field_of_view"], + mode=data.get("mode", "Serial"), + channel=data.get("channel", "ion"), + align=data.get("align", True) + ) + + def __repr__(self): + return f"{self.to_dict()}" + + +class MillingTaskSettings: + milling: MillingSettings + patterns: List[MillingPatternParameters] + + def __init__(self, milling: dict, patterns: List[MillingPatternParameters], name: str = "Milling Task"): + self.name = name + self.milling = milling + self.patterns = patterns + + def to_dict(self) -> dict: + return {"name": self.name, + "milling": self.milling.to_dict(), + "patterns": [pattern.to_dict() for pattern in self.patterns]} + + @staticmethod + def from_dict(data: dict): + return MillingTaskSettings( + name=data.get("name", "Milling Task"), + milling=MillingSettings.from_dict(data["milling"]), + patterns=[pattern_generator[p["pattern"]].from_dict(p) for p in data["patterns"]]) + + def __repr__(self): + return f"{self.to_dict()}" + + def generate(self) -> List[MillingPatternParameters]: + """Generate the list of invidual shapes that can be drawn on the microscope from the high-level patterns.""" + patterns = [] + for pattern in self.patterns: + patterns.extend(pattern.generate()) + return patterns + + +def save_milling_tasks(path: str, milling_tasks: Dict[str, MillingTaskSettings]): + mdict = {k: v.to_dict() for k, v in milling_tasks.items()} + with open(path, "w") as f: + yaml.dump(mdict, f) + +def load_milling_tasks(path: str) -> Dict[str, MillingTaskSettings]: + milling_tasks = {} + with open(path, "r") as f: + yaml_file = yaml.safe_load(f) + + # convert the dictionary to Dict[str, MillingTaskSettings] + milling_tasks = {k: MillingTaskSettings.from_dict(v) for k, v in yaml_file.items()} + + return milling_tasks diff --git a/src/odemis/acq/milling/test/patterns_test.py b/src/odemis/acq/milling/test/patterns_test.py new file mode 100644 index 0000000000..d9ea52a44c --- /dev/null +++ b/src/odemis/acq/milling/test/patterns_test.py @@ -0,0 +1,229 @@ +# -*- coding: utf-8 -*- +""" +@author: Patrick Cleeve + +Copyright © 2024, Delmic + +This file is part of Odemis. + +Odemis is free software: you can redistribute it and/or modify it under the terms +of the GNU General Public License version 2 as published by the Free Software +Foundation. + +Odemis is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; +without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR +PURPOSE. See the GNU General Public License for more details. + +You should have received a copy of the GNU General Public License along with +Odemis. If not, see http://www.gnu.org/licenses/. +""" +import logging +import unittest +import numpy +from odemis.acq.milling.patterns import RectanglePatternParameters, TrenchPatternParameters, MicroexpansionPatternParameters + +logging.basicConfig(format="%(asctime)s %(levelname)-7s %(module)-15s: %(message)s") +logging.getLogger().setLevel(logging.DEBUG) + +class RectanglePatternParametersTestCase(unittest.TestCase): + + def setUp(self): + self.name = "Rectangle-1" + self.width = 10e-6 + self.height = 10e-6 + self.depth = 10e-6 + self.rotation = 0 + self.center = (0, 0) + self.scan_direction = "TopToBottom" + + self.pattern = RectanglePatternParameters( + name=self.name, + width=self.width, + height=self.height, + depth=self.depth, + rotation=self.rotation, + center=self.center, + scan_direction=self.scan_direction, + ) + + def test_assignment(self): + + # test assignment + self.assertEqual(self.pattern.name.value, self.name) + self.assertEqual(self.pattern.width.value, self.width) + self.assertEqual(self.pattern.height.value, self.height) + self.assertEqual(self.pattern.depth.value, self.depth) + self.assertEqual(self.pattern.rotation.value, self.rotation) + self.assertEqual(self.pattern.center.value, self.center) + self.assertEqual(self.pattern.scan_direction.value, self.scan_direction) + + def test_dict(self): + # test to_dict + rectangle_pattern_dict = self.pattern.to_dict() + self.assertEqual(rectangle_pattern_dict["name"], self.name) + self.assertEqual(rectangle_pattern_dict["width"], self.width) + self.assertEqual(rectangle_pattern_dict["height"], self.height) + self.assertEqual(rectangle_pattern_dict["depth"], self.depth) + self.assertEqual(rectangle_pattern_dict["rotation"], self.rotation) + self.assertEqual(rectangle_pattern_dict["center_x"], 0) + self.assertEqual(rectangle_pattern_dict["center_y"], 0) + self.assertEqual(rectangle_pattern_dict["scan_direction"], self.scan_direction) + self.assertEqual(rectangle_pattern_dict["pattern"], "rectangle") + + # test from_dict + rectangle_pattern_from_dict = RectanglePatternParameters.from_dict(rectangle_pattern_dict) + self.assertEqual(rectangle_pattern_from_dict.name.value, self.name) + self.assertEqual(rectangle_pattern_from_dict.width.value, self.width) + self.assertEqual(rectangle_pattern_from_dict.height.value, self.height) + self.assertEqual(rectangle_pattern_from_dict.depth.value, self.depth) + self.assertEqual(rectangle_pattern_from_dict.rotation.value, self.rotation) + self.assertEqual(rectangle_pattern_from_dict.center.value, self.center) + self.assertEqual(rectangle_pattern_from_dict.scan_direction.value, self.scan_direction) + + def test_generate(self): + # test generate + patterns = self.pattern.generate() + self.assertEqual(len(patterns), 1) + self.assertEqual(patterns, [self.pattern]) + +class TrenchPatternParametersTestCase(unittest.TestCase): + + def setUp(self): + self.name = "Trench-1" + self.width = 10e-6 + self.height = 10e-6 + self.depth = 10e-6 + self.spacing = 5e-6 + self.center = (0, 0) + + self.pattern = TrenchPatternParameters( + name=self.name, + width=self.width, + height=self.height, + depth=self.depth, + spacing=self.spacing, + center=self.center, + ) + + def test_assignment(self): + + # test assignment + self.assertEqual(self.pattern.name.value, self.name) + self.assertEqual(self.pattern.width.value, self.width) + self.assertEqual(self.pattern.height.value, self.height) + self.assertEqual(self.pattern.depth.value, self.depth) + self.assertEqual(self.pattern.spacing.value, self.spacing) + self.assertEqual(self.pattern.center.value, self.center) + + def test_dict(self): + # test to_dict + trench_pattern_dict = self.pattern.to_dict() + self.assertEqual(trench_pattern_dict["name"], self.name) + self.assertEqual(trench_pattern_dict["width"], self.width) + self.assertEqual(trench_pattern_dict["height"], self.height) + self.assertEqual(trench_pattern_dict["depth"], self.depth) + self.assertEqual(trench_pattern_dict["spacing"], self.spacing) + self.assertEqual(trench_pattern_dict["center_x"], 0) + self.assertEqual(trench_pattern_dict["center_y"], 0) + self.assertEqual(trench_pattern_dict["pattern"], "trench") + + # test from_dict + trench_pattern_from_dict = TrenchPatternParameters.from_dict(trench_pattern_dict) + self.assertEqual(trench_pattern_from_dict.name.value, self.name) + self.assertEqual(trench_pattern_from_dict.width.value, self.width) + self.assertEqual(trench_pattern_from_dict.height.value, self.height) + self.assertEqual(trench_pattern_from_dict.depth.value, self.depth) + self.assertEqual(trench_pattern_from_dict.spacing.value, self.spacing) + self.assertEqual(trench_pattern_from_dict.center.value, self.center) + + def test_generate(self): + # test generate + patterns = self.pattern.generate() + self.assertEqual(len(patterns), 2) + self.assertEqual(patterns[0].name.value, f"{self.name} (Upper)") + self.assertAlmostEqual(patterns[0].width.value, self.width) + self.assertAlmostEqual(patterns[0].height.value, self.height) + self.assertAlmostEqual(patterns[0].depth.value, self.depth) + self.assertAlmostEqual(patterns[0].rotation.value, 0) + numpy.testing.assert_array_almost_equal(patterns[0].center.value, (0, (self.spacing + self.height) / 2)) + self.assertEqual(patterns[0].scan_direction.value, "TopToBottom") + + self.assertEqual(patterns[1].name.value, f"{self.name} (Lower)") + self.assertAlmostEqual(patterns[1].width.value, self.width) + self.assertAlmostEqual(patterns[1].height.value, self.height) + self.assertAlmostEqual(patterns[1].depth.value, self.depth) + self.assertAlmostEqual(patterns[1].rotation.value, 0) + numpy.testing.assert_array_almost_equal(patterns[1].center.value, (0, -(self.spacing + self.height) / 2)) + self.assertEqual(patterns[1].scan_direction.value, "BottomToTop") + + + +class MicroexpansionPatternParametersTestCase(unittest.TestCase): + + def setUp(self): + self.name = "Microexpansion-1" + self.width = 1e-6 + self.height = 10e-6 + self.depth = 5e-6 + self.spacing = 20e-6 + self.center = (0, 0) + + self.pattern = MicroexpansionPatternParameters( + name=self.name, + width=self.width, + height=self.height, + depth=self.depth, + spacing=self.spacing, + center=self.center, + ) + + def test_assignment(self): + + # test assignment + self.assertEqual(self.pattern.name.value, self.name) + self.assertEqual(self.pattern.width.value, self.width) + self.assertEqual(self.pattern.height.value, self.height) + self.assertEqual(self.pattern.depth.value, self.depth) + self.assertEqual(self.pattern.spacing.value, self.spacing) + self.assertEqual(self.pattern.center.value, self.center) + + def test_dict(self): + # test to_dict + microexpansion_pattern_dict = self.pattern.to_dict() + self.assertEqual(microexpansion_pattern_dict["name"], self.name) + self.assertEqual(microexpansion_pattern_dict["width"], self.width) + self.assertEqual(microexpansion_pattern_dict["height"], self.height) + self.assertEqual(microexpansion_pattern_dict["depth"], self.depth) + self.assertEqual(microexpansion_pattern_dict["spacing"], self.spacing) + self.assertEqual(microexpansion_pattern_dict["center_x"], 0) + self.assertEqual(microexpansion_pattern_dict["center_y"], 0) + self.assertEqual(microexpansion_pattern_dict["pattern"], "microexpansion") + + # test from_dict + microexpansion_pattern_from_dict = MicroexpansionPatternParameters.from_dict(microexpansion_pattern_dict) + self.assertEqual(microexpansion_pattern_from_dict.name.value, self.name) + self.assertEqual(microexpansion_pattern_from_dict.width.value, self.width) + self.assertEqual(microexpansion_pattern_from_dict.height.value, self.height) + self.assertEqual(microexpansion_pattern_from_dict.depth.value, self.depth) + self.assertEqual(microexpansion_pattern_from_dict.spacing.value, self.spacing) + self.assertEqual(microexpansion_pattern_from_dict.center.value, self.center) + + def test_generate(self): + # test generate + patterns = self.pattern.generate() + self.assertEqual(len(patterns), 2) + self.assertEqual(patterns[0].name.value, f"{self.name} (Left)") + self.assertAlmostEqual(patterns[0].width.value, self.width) + self.assertAlmostEqual(patterns[0].height.value, self.height) + self.assertAlmostEqual(patterns[0].depth.value, self.depth) + self.assertAlmostEqual(patterns[0].rotation.value, 0) + numpy.testing.assert_array_almost_equal(patterns[0].center.value, (-self.spacing, 0)) + self.assertEqual(patterns[0].scan_direction.value, "TopToBottom") + + self.assertEqual(patterns[1].name.value, f"{self.name} (Right)") + self.assertAlmostEqual(patterns[1].width.value, self.width) + self.assertAlmostEqual(patterns[1].height.value, self.height) + self.assertAlmostEqual(patterns[1].depth.value, self.depth) + self.assertAlmostEqual(patterns[1].rotation.value, 0) + numpy.testing.assert_array_almost_equal(patterns[1].center.value, (self.spacing, 0)) + self.assertEqual(patterns[1].scan_direction.value, "TopToBottom") diff --git a/src/odemis/acq/milling/test/tasks_test.py b/src/odemis/acq/milling/test/tasks_test.py new file mode 100644 index 0000000000..54d4826525 --- /dev/null +++ b/src/odemis/acq/milling/test/tasks_test.py @@ -0,0 +1,153 @@ +# -*- coding: utf-8 -*- +""" +@author: Patrick Cleeve + +Copyright © 2024, Delmic + +This file is part of Odemis. + +Odemis is free software: you can redistribute it and/or modify it under the terms +of the GNU General Public License version 2 as published by the Free Software +Foundation. + +Odemis is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; +without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR +PURPOSE. See the GNU General Public License for more details. + +You should have received a copy of the GNU General Public License along with +Odemis. If not, see http://www.gnu.org/licenses/. +""" +import os +import logging +import unittest +from odemis.acq.milling.patterns import TrenchPatternParameters, MicroexpansionPatternParameters +from odemis.acq.milling.tasks import MillingTaskSettings, MillingSettings, load_milling_tasks, save_milling_tasks + +logging.basicConfig(format="%(asctime)s %(levelname)-7s %(module)-15s: %(message)s") +logging.getLogger().setLevel(logging.DEBUG) + +TASKS_PATH = os.path.join(os.getcwd(), "milling_tasks.yaml") + +class MillingTaskTestCase(unittest.TestCase): + + @classmethod + def setUpClass(cls): + pass + + @classmethod + def tearDownClass(cls): + + if os.path.exists(TASKS_PATH): + os.remove(TASKS_PATH) + + def setUp(self): + pass + + def test_milling_settings(self): + + current = 100e-9 + voltage = 30e3 + field_of_view = 400e-6 + mode = "Serial" + channel = "ion" + milling_settings = MillingSettings(current, voltage, field_of_view, mode, channel) + + self.assertEqual(milling_settings.current.value, current) + self.assertEqual(milling_settings.voltage.value, voltage) + self.assertEqual(milling_settings.field_of_view.value, field_of_view) + self.assertEqual(milling_settings.mode.value, mode) + self.assertEqual(milling_settings.channel.value, channel) + + dict_data = milling_settings.to_dict() + self.assertEqual(dict_data["current"], current) + self.assertEqual(dict_data["voltage"], voltage) + self.assertEqual(dict_data["field_of_view"], field_of_view) + self.assertEqual(dict_data["mode"], mode) + self.assertEqual(dict_data["channel"], channel) + + milling_settings_from_dict = MillingSettings.from_dict(dict_data) + self.assertEqual(milling_settings_from_dict.current.value, current) + self.assertEqual(milling_settings_from_dict.voltage.value, voltage) + self.assertEqual(milling_settings_from_dict.field_of_view.value, field_of_view) + self.assertEqual(milling_settings_from_dict.mode.value, mode) + self.assertEqual(milling_settings_from_dict.channel.value, channel) + + def test_milling_task_settings(self): + + milling_settings = MillingSettings(100e-9, 30e3, 400e-6, "Serial", "ion") + trench_pattern = TrenchPatternParameters(1e-6, 1e-6, 100e-9, 1e-6, (0, 0)) + + milling_task_settings = MillingTaskSettings(milling_settings, [trench_pattern]) + + self.assertEqual(milling_task_settings.milling.current.value, milling_settings.current.value) + self.assertEqual(milling_task_settings.milling.voltage.value, milling_settings.voltage.value) + self.assertEqual(milling_task_settings.milling.field_of_view.value, milling_settings.field_of_view.value) + self.assertEqual(milling_task_settings.milling.mode.value, milling_settings.mode.value) + self.assertEqual(milling_task_settings.milling.channel.value, milling_settings.channel.value) + self.assertEqual(milling_task_settings.patterns[0].width.value, trench_pattern.width.value) + self.assertEqual(milling_task_settings.patterns[0].height.value, trench_pattern.height.value) + self.assertEqual(milling_task_settings.patterns[0].depth.value, trench_pattern.depth.value) + self.assertEqual(milling_task_settings.patterns[0].spacing.value, trench_pattern.spacing.value) + self.assertEqual(milling_task_settings.patterns[0].center.value, trench_pattern.center.value) + + dict_data = milling_task_settings.to_dict() + self.assertEqual(dict_data["name"], "Milling Task") + self.assertEqual(dict_data["milling"], milling_settings.to_dict()) + self.assertEqual(dict_data["patterns"][0], trench_pattern.to_dict()) + + milling_task_settings_from_dict = MillingTaskSettings.from_dict(dict_data) + self.assertEqual(milling_task_settings_from_dict.milling.current.value, milling_settings.current.value) + self.assertEqual(milling_task_settings_from_dict.milling.voltage.value, milling_settings.voltage.value) + self.assertEqual(milling_task_settings_from_dict.milling.field_of_view.value, milling_settings.field_of_view.value) + self.assertEqual(milling_task_settings_from_dict.milling.mode.value, milling_settings.mode.value) + self.assertEqual(milling_task_settings_from_dict.milling.channel.value, milling_settings.channel.value) + self.assertEqual(milling_task_settings_from_dict.patterns[0].width.value, trench_pattern.width.value) + self.assertEqual(milling_task_settings_from_dict.patterns[0].height.value, trench_pattern.height.value) + self.assertEqual(milling_task_settings_from_dict.patterns[0].depth.value, trench_pattern.depth.value) + self.assertEqual(milling_task_settings_from_dict.patterns[0].spacing.value, trench_pattern.spacing.value) + self.assertEqual(milling_task_settings_from_dict.patterns[0].center.value, trench_pattern.center.value) + + def test_save_load_task_settings(self): + + milling_settings = MillingSettings(100e-9, 30e3, 400e-6, "Serial", "ion") + trench_pattern = TrenchPatternParameters(10e-6, 3e-6, 100e-9, 2e-6, (0, 0)) + trench_task_settings = MillingTaskSettings(milling_settings, [trench_pattern]) + + milling_settings = MillingSettings(100e-9, 30e3, 400e-6, "Serial", "ion") + microexpansion_pattern = MicroexpansionPatternParameters(1e-6, 10e-6, 100e-9, 10e-6, (0, 0)) + microexpansion_task_settings = MillingTaskSettings(milling_settings, [microexpansion_pattern]) + + tasks = {"Trench": trench_task_settings, "Microexpansion": microexpansion_task_settings} + + # save and load the tasks + save_milling_tasks(path=TASKS_PATH, milling_tasks=tasks) + loaded_tasks = load_milling_tasks(TASKS_PATH) + + self.assertTrue("Trench" in loaded_tasks) + self.assertTrue("Microexpansion" in loaded_tasks) + + self.assertEqual(loaded_tasks["Trench"].milling.current.value, trench_task_settings.milling.current.value) + self.assertEqual(loaded_tasks["Trench"].milling.voltage.value, trench_task_settings.milling.voltage.value) + self.assertEqual(loaded_tasks["Trench"].milling.field_of_view.value, trench_task_settings.milling.field_of_view.value) + self.assertEqual(loaded_tasks["Trench"].milling.mode.value, trench_task_settings.milling.mode.value) + self.assertEqual(loaded_tasks["Trench"].milling.channel.value, trench_task_settings.milling.channel.value) + self.assertEqual(loaded_tasks["Trench"].patterns[0].width.value, trench_task_settings.patterns[0].width.value) + self.assertEqual(loaded_tasks["Trench"].patterns[0].height.value, trench_task_settings.patterns[0].height.value) + self.assertEqual(loaded_tasks["Trench"].patterns[0].depth.value, trench_task_settings.patterns[0].depth.value) + self.assertEqual(loaded_tasks["Trench"].patterns[0].spacing.value, trench_task_settings.patterns[0].spacing.value) + self.assertEqual(loaded_tasks["Trench"].patterns[0].center.value, trench_task_settings.patterns[0].center.value) + + self.assertEqual(loaded_tasks["Microexpansion"].milling.current.value, microexpansion_task_settings.milling.current.value) + self.assertEqual(loaded_tasks["Microexpansion"].milling.voltage.value, microexpansion_task_settings.milling.voltage.value) + self.assertEqual(loaded_tasks["Microexpansion"].milling.field_of_view.value, microexpansion_task_settings.milling.field_of_view.value) + self.assertEqual(loaded_tasks["Microexpansion"].milling.mode.value, microexpansion_task_settings.milling.mode.value) + self.assertEqual(loaded_tasks["Microexpansion"].milling.channel.value, microexpansion_task_settings.milling.channel.value) + self.assertEqual(loaded_tasks["Microexpansion"].patterns[0].width.value, microexpansion_task_settings.patterns[0].width.value) + self.assertEqual(loaded_tasks["Microexpansion"].patterns[0].height.value, microexpansion_task_settings.patterns[0].height.value) + self.assertEqual(loaded_tasks["Microexpansion"].patterns[0].depth.value, microexpansion_task_settings.patterns[0].depth.value) + self.assertEqual(loaded_tasks["Microexpansion"].patterns[0].spacing.value, microexpansion_task_settings.patterns[0].spacing.value) + self.assertEqual(loaded_tasks["Microexpansion"].patterns[0].center.value, microexpansion_task_settings.patterns[0].center.value) + + +if __name__ == "__main__": + unittest.main() diff --git a/src/odemis/driver/autoscript_client.py b/src/odemis/driver/autoscript_client.py index b160636101..a801d024d9 100644 --- a/src/odemis/driver/autoscript_client.py +++ b/src/odemis/driver/autoscript_client.py @@ -1032,9 +1032,10 @@ def set_patterning_mode(self, mode: str) -> None: self.server.set_patterning_mode(mode) def clear_patterns(self) -> None: - """Clear all patterns.""" + """Clear all patterns in fib. NOTE: active_view 2 is the fib view""" with self._proxy_access: self.server._pyroClaimOwnership() + self.server.set_active_view(2) # channel = ion self.server.clear_patterns() def set_default_application_file(self, application_file: str = "Si") -> None: From 8f67851040e3209b7fa78b543b95ab0ed7d931be Mon Sep 17 00:00:00 2001 From: Patrick Cleeve Date: Fri, 21 Feb 2025 12:59:53 +1100 Subject: [PATCH 2/2] [feat] add openfibsem milling integration --- src/odemis/acq/milling/openfibsem.py | 239 ++++++++++++++++++ .../milling/test/openfibsem_millmng_test.py | 85 +++++++ .../acq/milling/test/openfibsem_test.py | 212 ++++++++++++++++ 3 files changed, 536 insertions(+) create mode 100644 src/odemis/acq/milling/openfibsem.py create mode 100644 src/odemis/acq/milling/test/openfibsem_millmng_test.py create mode 100644 src/odemis/acq/milling/test/openfibsem_test.py diff --git a/src/odemis/acq/milling/openfibsem.py b/src/odemis/acq/milling/openfibsem.py new file mode 100644 index 0000000000..01256dbf06 --- /dev/null +++ b/src/odemis/acq/milling/openfibsem.py @@ -0,0 +1,239 @@ + +import logging +import math +import os +import threading +import time +from concurrent import futures +from concurrent.futures._base import CANCELLED, FINISHED, RUNNING, CancelledError +from typing import Dict, List, Optional + +import sys +sys.path.append("/home/patrick/development/openfibsem/fibsem") +sys.path.append(f"{os.path.expanduser('~')}/development/fibsem") + +from fibsem import utils +from fibsem.microscopes.odemis_microscope import OdemisMicroscope +from fibsem.milling import FibsemMillingStage, estimate_total_milling_time, mill_stages, MillingAlignment +from fibsem.milling.patterning.patterns2 import ( + BasePattern, + MicroExpansionPattern, + RectanglePattern, + TrenchPattern, +) +from fibsem.structures import FibsemMillingSettings, Point +from fibsem.utils import load_microscope_configuration +from odemis import model +from odemis.acq.milling.patterns import ( + MicroexpansionPatternParameters, + MillingPatternParameters, + RectanglePatternParameters, + TrenchPatternParameters, +) +from odemis.acq.milling.tasks import ( + MillingSettings, + MillingTaskSettings, +) +from odemis.util import executeAsyncTask + +def create_openfibsem_microscope() -> OdemisMicroscope: + """Create an openfibsem microscope instance with the current microscope configuration.""" + + # TODO: extract the rest of the required metadata + # TODO: create tescan compatible version + + # stage metadata + stage_bare = model.getComponent(role="stage-bare") + stage_md = stage_bare.getMetadata() + pre_tilt = stage_md[model.MD_CALIB].get(model.MD_SAMPLE_PRE_TILT, math.radians(35)) + rotation_reference = stage_md[model.MD_FAV_SEM_POS_ACTIVE]["rz"] + + # loads the default config + config = load_microscope_configuration() + config.system.stage.shuttle_pre_tilt = math.degrees(pre_tilt) + config.system.stage.rotation_reference = math.degrees(rotation_reference) + config.system.stage.rotation_180 = math.degrees(rotation_reference + math.pi) + microscope = OdemisMicroscope(config.system) + + return microscope + +def convert_pattern(p: MillingPatternParameters) -> BasePattern: + """Convert from an odemis pattern to an openfibsem pattern""" + if isinstance(p, RectanglePatternParameters): + return _convert_rectangle_pattern(p) + + if isinstance(p, TrenchPatternParameters): + return _convert_trench_pattern(p) + + if isinstance(p, MicroexpansionPatternParameters): + return _convert_microexpansion_pattern(p) + +def _convert_rectangle_pattern(p: RectanglePatternParameters) -> RectanglePattern: + return RectanglePattern( + width=p.width.value, + height=p.height.value, + depth=p.depth.value, + rotation=p.rotation.value, + scan_direction=p.scan_direction.value, + point=Point(x=p.center.value[0], y=p.center.value[1]) + ) + +def _convert_trench_pattern(p: TrenchPatternParameters) -> TrenchPattern: + + return TrenchPattern( + width=p.width.value, + upper_trench_height=p.height.value, + lower_trench_height=p.height.value, + spacing=p.spacing.value, + depth=p.depth.value, + point=Point(x=p.center.value[0], y=p.center.value[1]) + ) + +def _convert_microexpansion_pattern(p: MicroexpansionPatternParameters) -> MicroExpansionPattern: + return MicroExpansionPattern( + width=p.width.value, + height=p.height.value, + depth=p.depth.value, + distance=p.spacing.value, + point=Point(x=p.center.value[0], y=p.center.value[1]) + ) + +def convert_milling_settings(s: MillingSettings) -> FibsemMillingSettings: + """Convert from an odemis milling settings to an openfibsem milling settings""" + return FibsemMillingSettings( + milling_current=s.current.value, + milling_voltage=s.voltage.value, + patterning_mode=s.mode.value, + hfw=s.field_of_view.value, + ) + +# task converter +def convert_task_to_milling_stage(task: MillingTaskSettings) -> FibsemMillingStage: + """Convert from an odemis milling task to an openfibsem milling stage""" + s = convert_milling_settings(task.milling) + p = convert_pattern(task.patterns[0]) + a = MillingAlignment(enabled=task.milling.align.value) + + milling_stage = FibsemMillingStage( + name=task.name, + milling=s, + pattern=p, + alignment=a, + ) + return milling_stage + +def convert_milling_tasks_to_milling_stages(milling_tasks: List[MillingTaskSettings]) -> List[FibsemMillingStage]: + """Convert from odemis milling tasks to openfibsem milling stages""" + milling_stages = [] + + if isinstance(milling_tasks, dict): + milling_tasks = list(milling_tasks.values()) + + for task in milling_tasks: + milling_stage = convert_task_to_milling_stage(task) + milling_stages.append(milling_stage) + + return milling_stages + +class OpenFIBSEMMillingTaskManager: + """This class manages running milling tasks via openfibsem.""" + + def __init__(self, future: futures.Future, + tasks: List[MillingTaskSettings], + path: Optional[str] = None): + """ + :param future: the future that will be executing the task + :param tasks: The milling tasks to run (in order) + :param path: The path to save the images (optional) + """ + # create microscope connection + self.microscope = create_openfibsem_microscope() + if path is None: + path = os.getcwd() + self.microscope._last_imaging_settings.path = path # note: image acquisition post-milling is not yet supported via odemis + + # convert the tasks to milling stages + self.tasks = tasks + self.milling_stages = convert_milling_tasks_to_milling_stages(self.tasks) + + self._future = future + if future is not None: + self._future.running_subf = model.InstantaneousFuture() + self._future._task_lock = threading.Lock() + + def cancel(self, future: futures.Future) -> bool: + """ + Canceler of acquisition task. + :param future: the future that will be executing the task + :return: True if it successfully cancelled (stopped) the future + """ + logging.debug("Canceling milling procedure...") + + with future._task_lock: + if future._task_state == FINISHED: + return False + future._task_state = CANCELLED + future.running_subf.cancel() + self.microscope.stop_milling() + logging.debug("Milling procedure cancelled.") + return True + + def estimate_milling_time(self) -> float: + """ + Estimates the milling time for the given patterns. + :return: (float > 0): the estimated time is in seconds + """ + return estimate_total_milling_time(self.milling_stages) + + def run_milling(self, stage: FibsemMillingStage) -> None: + """Run the milling tasks via openfibsem + :param stage: the milling stage to run""" + mill_stages(self.microscope, [stage]) + + def run(self): + """ + The main function of the task class, which will be called by the future asynchronously + """ + self._future._task_state = RUNNING + + # TODO: connect the progress signal + try: + for stage in self.milling_stages: + with self._future._task_lock: + if self._future._task_state == CANCELLED: + raise CancelledError() + + logging.info(f"Running milling stage: {stage.name}") + self.run_milling(stage=stage) + except CancelledError: + logging.debug("Stopping because milling was cancelled") + raise + except Exception: + logging.exception("The milling failed") + raise + finally: + self._future._task_state = FINISHED + + +def run_milling_tasks_openfibsem(tasks: List[MillingTaskSettings], + path: Optional[str] = None) -> futures.Future: + """ + Run multiple milling tasks in order via openfibsem. + :param tasks: List of milling tasks to be executed in order. + :param path: The path to save the images + :return: ProgressiveFuture + """ + # Create a progressive future with running sub future + future = model.ProgressiveFuture() + # create milling task + millmng = OpenFIBSEMMillingTaskManager(future, tasks, path) + # add the ability of cancelling the future during execution + future.task_canceller = millmng.cancel + + # set the progress of the future + future.set_end_time(time.time() + millmng.estimate_milling_time() + 30) + + # assign the acquisition task to the future + executeAsyncTask(future, millmng.run) + + return future diff --git a/src/odemis/acq/milling/test/openfibsem_millmng_test.py b/src/odemis/acq/milling/test/openfibsem_millmng_test.py new file mode 100644 index 0000000000..ada6d53528 --- /dev/null +++ b/src/odemis/acq/milling/test/openfibsem_millmng_test.py @@ -0,0 +1,85 @@ +# -*- coding: utf-8 -*- +""" +Created on Feb 2025 + +Copyright © Delmic + +This file is part of Odemis. + +Odemis is free software: you can redistribute it and/or modify it under the terms +of the GNU General Public License version 2 as published by the Free Software +Foundation. + +Odemis is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; +without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR +PURPOSE. See the GNU General Public License for more details. + +You should have received a copy of the GNU General Public License along with +Odemis. If not, see http://www.gnu.org/licenses/. +""" +import glob +import logging +import os +import unittest +import time +import odemis +from odemis import model +from odemis.acq.milling.tasks import load_milling_tasks, __file__ as MILLING_PATH +from odemis.acq.milling.openfibsem import run_milling_tasks_openfibsem, OpenFIBSEMMillingTaskManager + +from odemis.util import testing + +logging.getLogger().setLevel(logging.DEBUG) +logging.basicConfig(format="%(asctime)s %(levelname)-7s %(module)s:%(lineno)d %(message)s") + +CONFIG_PATH = os.path.dirname(odemis.__file__) + "/../../install/linux/usr/share/odemis/" +METEOR_FISBEM_CONFIG = CONFIG_PATH + "sim/meteor-fibsem-sim.odm.yaml" +METEOR_FISBEM_CONFIG = "/home/patrick/development/odemis/install/linux/usr/share/odemis/sim/meteor-fibsem-sim.odm.yaml" +MILLING_TASKS_PATH = os.path.join(os.path.dirname(MILLING_PATH), "milling_tasks.yaml") + +class TestOpenFIBSEMMillingManager(unittest.TestCase): + + """ + Test the OpenFIBSEM Milling Manager + Requires the xt simulator to be running + """ + MIC_CONFIG = METEOR_FISBEM_CONFIG + + + @classmethod + def setUpClass(cls): + testing.start_backend(cls.MIC_CONFIG) + cls.microscope = model.getMicroscope() + cls.milling_tasks = load_milling_tasks(MILLING_TASKS_PATH) + + def test_estimate_total_milling_time(self): + """Test the estimate_total_milling_time function""" + openfibsem_milling_manager = OpenFIBSEMMillingTaskManager(None, + self.milling_tasks) + + # check that the estimated time is greater than 0 + estimated_time = openfibsem_milling_manager.estimate_milling_time() + self.assertGreater(estimated_time, 0) + + def test_openfibsem_milling_manager(self): + """Test the OpenFIBSEMMillingManager""" + tasks = self.milling_tasks + + f = run_milling_tasks_openfibsem(tasks) + f.result() + + # check workflow finished + self.assertTrue(f.done()) + self.assertFalse(f.cancelled()) + + def test_cancel_milling(self): + """Test cancel milling tasks""" + tasks = self.milling_tasks + + f = run_milling_tasks_openfibsem(tasks) + + time.sleep(5) + f.cancel() + + self.assertTrue(f.cancelled()) + self.assertTrue(f.done()) diff --git a/src/odemis/acq/milling/test/openfibsem_test.py b/src/odemis/acq/milling/test/openfibsem_test.py new file mode 100644 index 0000000000..f5ef31894e --- /dev/null +++ b/src/odemis/acq/milling/test/openfibsem_test.py @@ -0,0 +1,212 @@ +# -*- coding: utf-8 -*- +""" +Created on Feb 2025 + +Copyright © Delmic + +This file is part of Odemis. + +Odemis is free software: you can redistribute it and/or modify it under the terms +of the GNU General Public License version 2 as published by the Free Software +Foundation. + +Odemis is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; +without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR +PURPOSE. See the GNU General Public License for more details. + +You should have received a copy of the GNU General Public License along with +Odemis. If not, see http://www.gnu.org/licenses/. +""" +import logging +import unittest + +from odemis.acq.milling import openfibsem # to load the openfibsem module + +from fibsem.milling import MillingAlignment +from fibsem.milling.patterning.patterns2 import ( + BasePattern, + MicroExpansionPattern, + RectanglePattern, + TrenchPattern, +) +from fibsem.structures import Point +from odemis.acq.milling.openfibsem import ( + convert_milling_settings, + convert_milling_tasks_to_milling_stages, + convert_pattern, + convert_task_to_milling_stage, +) +from odemis.acq.milling.patterns import ( + MicroexpansionPatternParameters, + RectanglePatternParameters, + TrenchPatternParameters, +) +from odemis.acq.milling.tasks import MillingSettings, MillingTaskSettings + +logging.basicConfig(format="%(asctime)s %(levelname)-7s %(module)-15s: %(message)s") +logging.getLogger().setLevel(logging.DEBUG) + +# Create dummy parameter objects to pass into converter functions. +def create_rectangle_pattern_params(): + return RectanglePatternParameters( + name="Rectangle-1", + width=10e-6, + height=15e-6, + depth=5e-6, + rotation=0, + center=(100, 150), + scan_direction="TopToBottom", + ) + +def create_trench_pattern_params(): + return TrenchPatternParameters( + name="Trench-1", + width=12e-6, + height=8e-6, + depth=4e-6, + spacing=3e-6, + center=(50, 75), + ) + +def create_microexpansion_pattern_params(): + return MicroexpansionPatternParameters( + name="Microexpansion-1", + width=5e-6, + height=10e-6, + depth=3e-6, + spacing=7e-6, + center=(25, 35), + ) + +class TestConvertPatterns(unittest.TestCase): + + def test_convert_rectangle_pattern(self): + pattern_param = create_rectangle_pattern_params() + converted = convert_pattern(pattern_param) + self.assertIsInstance(converted, RectanglePattern) + self.assertAlmostEqual(converted.width, pattern_param.width.value) + self.assertAlmostEqual(converted.height, pattern_param.height.value) + self.assertAlmostEqual(converted.depth, pattern_param.depth.value) + self.assertAlmostEqual(converted.rotation, pattern_param.rotation.value) + self.assertEqual(converted.scan_direction, pattern_param.scan_direction.value) + self.assertEqual(converted.point, Point(x=pattern_param.center.value[0], + y=pattern_param.center.value[1])) + + def test_convert_trench_pattern(self): + pattern_param = create_trench_pattern_params() + converted = convert_pattern(pattern_param) + self.assertIsInstance(converted, TrenchPattern) + self.assertAlmostEqual(converted.width, pattern_param.width.value) + # Both upper and lower trench heights should be equal to pattern_param.height.value + self.assertAlmostEqual(converted.upper_trench_height, pattern_param.height.value) + self.assertAlmostEqual(converted.lower_trench_height, pattern_param.height.value) + self.assertAlmostEqual(converted.depth, pattern_param.depth.value) + self.assertAlmostEqual(converted.spacing, pattern_param.spacing.value) + self.assertEqual(converted.point, Point(x=pattern_param.center.value[0], + y=pattern_param.center.value[1])) + + def test_convert_microexpansion_pattern(self): + pattern_param = create_microexpansion_pattern_params() + converted = convert_pattern(pattern_param) + self.assertIsInstance(converted, MicroExpansionPattern) + self.assertAlmostEqual(converted.width, pattern_param.width.value) + self.assertAlmostEqual(converted.height, pattern_param.height.value) + self.assertAlmostEqual(converted.depth, pattern_param.depth.value) + self.assertAlmostEqual(converted.distance, pattern_param.spacing.value) + self.assertEqual(converted.point, Point(x=pattern_param.center.value[0], + y=pattern_param.center.value[1])) + +class TestConvertMillingSettings(unittest.TestCase): + + def test_convert_milling_settings(self): + dummy_settings = MillingSettings( + current=1e-9, + voltage=30000, + mode="Serial", + field_of_view=80e-6, + align=True + ) + converted = convert_milling_settings(dummy_settings) + # Validate that the converted settings match + self.assertAlmostEqual(converted.milling_current, dummy_settings.current.value) + self.assertAlmostEqual(converted.milling_voltage, dummy_settings.voltage.value) + self.assertEqual(converted.patterning_mode, dummy_settings.mode.value) + self.assertAlmostEqual(converted.hfw, dummy_settings.field_of_view.value) + +class TestConvertTaskToMillingStage(unittest.TestCase): + + def test_convert_task_to_milling_stage(self): + dummy_milling = MillingSettings( + current=1e-9, + voltage=30000, + mode="Serial", + field_of_view=80e-6, + align=True + ) + # Create a rectangle pattern parameter instance + pattern_param = create_rectangle_pattern_params() + + # Create a dummy task with a name, milling settings, and a single pattern. + dummy_task = MillingTaskSettings( + name="Task-1", + milling=dummy_milling, + patterns=[pattern_param], + ) + + stage = convert_task_to_milling_stage(dummy_task) + # Check that stage has been constructed correctly. + self.assertEqual(stage.name, dummy_task.name) + # Check milling settings conversion + self.assertAlmostEqual(stage.milling.milling_current, dummy_milling.current.value) + self.assertAlmostEqual(stage.milling.milling_voltage, dummy_milling.voltage.value) + + # Check pattern conversion: since we passed a rectangle, expect RectanglePattern output. + self.assertIsInstance(stage.pattern, RectanglePattern) + + # Check alignment conversion; alignment.enabled should reflect dummy_milling.align.value. + self.assertIsInstance(stage.alignment, MillingAlignment) + self.assertEqual(stage.alignment.enabled, dummy_milling.align.value) + +class TestConvertMillingTasksToMillingStages(unittest.TestCase): + + def test_convert_milling_tasks_to_milling_stages(self): + # Create two dummy tasks. + dummy_milling1 = MillingSettings( + current=1e-9, + voltage=30000, + mode="Serial", + field_of_view=80e-6, + align=False + ) + dummy_milling2 = MillingSettings( + current=2e-9, + voltage=5000, + mode="Parallel", + field_of_view=150e-6, + align=True + ) + pattern_param1 = create_trench_pattern_params() + pattern_param2 = create_microexpansion_pattern_params() + + task1 = MillingTaskSettings( + name="Task-1", + milling=dummy_milling1, + patterns=[pattern_param1], + ) + task2 = MillingTaskSettings( + name="Task-2", + milling=dummy_milling2, + patterns=[pattern_param2], + ) + tasks = [task1, task2] + stages = convert_milling_tasks_to_milling_stages(tasks) + self.assertEqual(len(stages), 2) + # Check names and basic settings of each stage + self.assertEqual(stages[0].name, task1.name) + self.assertEqual(stages[1].name, task2.name) + # Check that each stage has a valid pattern conversion + self.assertIsInstance(stages[0].pattern, BasePattern) + self.assertIsInstance(stages[1].pattern, BasePattern) + +if __name__ == "__main__": + unittest.main()