Skip to content

Commit

Permalink
Merge branch 'meteor-960-automated-workflow-manager' into meteor-1100…
Browse files Browse the repository at this point in the history
…-fibsem-tab
  • Loading branch information
patrickcleeve2 committed Feb 21, 2025
2 parents e9e33ab + 2fb885a commit 9e9e105
Show file tree
Hide file tree
Showing 3 changed files with 287 additions and 18 deletions.
298 changes: 286 additions & 12 deletions src/odemis/acq/milling/millmng.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,27 +24,40 @@
"""
import logging
import os
import threading
import time
from concurrent.futures._base import (
CANCELLED,
FINISHED,
RUNNING,
CancelledError,
Future,
)
from typing import List
from concurrent.futures import Future
from concurrent.futures._base import CANCELLED, FINISHED, RUNNING, CancelledError
from datetime import datetime
from enum import Enum
from typing import Dict, List


from odemis import model
from odemis.acq.acqmng import acquire
from odemis.acq.drift import align_reference_image
from odemis.acq.milling.patterns import (
RectanglePatternParameters,
from odemis.acq.feature import (
FEATURE_DEACTIVE,
FEATURE_POLISHED,
FEATURE_ROUGH_MILLED,
FEATURE_ACTIVE,
FEATURE_READY_TO_MILL,
CryoFeature,
REFERENCE_IMAGE_FILENAME,
)
from odemis.acq.milling.tasks import MillingTaskSettings
from odemis.acq.stream import FIBStream
from odemis.acq.milling.patterns import RectanglePatternParameters
from odemis.acq.move import (
MILLING,
POSITION_NAMES,
SEM_IMAGING,
MicroscopePostureManager,
)
from odemis.acq.stream import FIBStream, SEMStream
from odemis.dataio import find_fittest_converter
from odemis.util import executeAsyncTask

from odemis.util.dataio import open_acquisition

class TFSMillingTaskManager:
"""This class manages running milling tasks."""
Expand Down Expand Up @@ -219,3 +232,264 @@ def run_milling_tasks(tasks: List[MillingTaskSettings], fib_stream: FIBStream) -
executeAsyncTask(future, milling_task_manager.run)

return future

class MillingWorkflowTask(Enum):
RoughMilling = "Rough Milling"
Polishing = "Polishing"

status_map: Dict[MillingWorkflowTask, str] = {
MillingWorkflowTask.RoughMilling: FEATURE_ROUGH_MILLED,
MillingWorkflowTask.Polishing: FEATURE_POLISHED,
}
def get_associated_tasks(wt: MillingWorkflowTask,
milling_tasks: Dict[str, MillingTaskSettings]) -> List[MillingTaskSettings]:
"""Get the milling tasks associated with the given workflow task.
:param wt: The workflow task to get associated tasks for.
:param milling_tasks: The dictionary of all milling tasks.
:return: List of associated tasks."""
associated_tasks = []
for task in milling_tasks.values():

if wt.value in task.name:
associated_tasks.append(task)

# special case for micro-expansion to associate with rough milling
if wt is MillingWorkflowTask.RoughMilling and "Microexpansion" in task.name:
associated_tasks.insert(0, task) # should be the first task

return associated_tasks

class AutomatedMillingManager(object):

def __init__(self,
future: Future,
features: List[CryoFeature],
stage: model.Actuator,
sem_stream: SEMStream,
fib_stream: FIBStream,
task_list: List[MillingWorkflowTask],
):

self.stage = stage
self.sem_stream = sem_stream
self.fib_stream = fib_stream
self.ion_beam = fib_stream.emitter
self.features = features
self.task_list = task_list
self._exporter = find_fittest_converter("filename.ome.tiff")
self.pm = MicroscopePostureManager(model.getMicroscope())
self._prefix: str = ""

self._future = future
if future is not None:
self._future.running_subf = model.InstantaneousFuture()
self._future._task_lock = threading.Lock()

def cancel(self, future: Future) -> bool:
"""
Canceler of milling task.
:param future: the future that will be executing the task
:return: True if it successfully cancelled (stopped) the future
"""
logging.debug("Canceling milling procedure...")

with future._task_lock:
if future._task_state == FINISHED:
return False
future._task_state = CANCELLED
future.running_subf.cancel()
logging.debug("Milling procedure cancelled.")
return True

def run(self):
self._future._task_state = RUNNING

for task_num, workflow_task in enumerate(self.task_list, 1):

self.current_workflow = workflow_task.value
logging.info(f"Starting {task_num}/{len(self.task_list)}: {self.current_workflow} for {len(self.features)} features...")

current_posture = self.pm.getCurrentPostureLabel()
if current_posture not in [SEM_IMAGING, MILLING]:
raise ValueError(f"Current posture is {POSITION_NAMES[current_posture]}. "
"Please switch to SEM_IMAGING or MILLING before starting automated milling.")

for feature in self.features:

if feature.status.value == FEATURE_DEACTIVE:
logging.info(f"Skipping {feature.name.value} as it is deactivated.")
continue

if feature.status.value == FEATURE_ACTIVE:
logging.info(f"Skipping {feature.name.value} as it is not ready for milling.")
continue

# prefix for images
self._prefix = f"{feature.name.value}-{self.current_workflow}"

self._future.msg = f"{feature.name.value}: Starting {self.current_workflow}"
self._future.current_feature = feature
self._future.set_progress()

logging.info(f"Starting {self.current_workflow} for {feature.name.value}, status: {feature.status.value}")

############# STAGE MOVEMENT #############
self._move_to_milling_position(feature)

############# ALIGNMENT #############
self._align_reference_image(feature)

############# MILLLING #############
self._run_milling_tasks(feature, workflow_task)

############# REFERENCE IMAGING #############
self._acquire_reference_images(feature)

# update status
feature.status.value = status_map[workflow_task]

logging.info(f"Finished {self.current_workflow} for {feature.name.value}")

# TODO: implement fm imaging between workflow tasks
# TODO: configuraable settings for sem/fib imaging
# TODO: configurable workflow for flm imaging

def check_cancelled(self):
with self._future._task_lock:
if self._future.cancelled() == CANCELLED:
raise CancelledError()

def _move_to_milling_position(self, feature: CryoFeature) -> None:

self.check_cancelled()
self._future.msg = f"{feature.name.value}: Moving to Milling Position"
self._future.set_progress()

# milling position
stage_position = feature.get_posture_position(MILLING)

# move to position
self._future.running_subf = self.stage.moveAbs(stage_position)
self._future.running_subf.result()

self._future.msg = f"Moved to {feature.name.value}"
self._future.set_progress()

def _run_milling_tasks(self, feature: CryoFeature, workflow_task: MillingWorkflowTask) -> None:
"""Run the milling tasks for the given feature and the workflow."""
# get milling tasks
milling_tasks = get_associated_tasks(
wt=workflow_task,
milling_tasks=feature.milling_tasks)

self._future.msg = f"{feature.name.value}: Milling: {self.current_workflow}"
self._future.set_progress()

self._future.running_subf = run_milling_tasks(milling_tasks)
self._future.running_subf.result()

def _align_reference_image(self, feature: CryoFeature) -> None:
"""Align the reference image to the current image using beam shift."""

self.check_cancelled()
self._future.msg = f"{feature.name.value}: Aligning Reference Image"
self._future.set_progress()

# reset beam shift
self.ion_beam.shift.value = (0, 0)

# match image settings for alignment
ref_image = feature.reference_image # load from directory?
if ref_image is None:
filename = f"{feature.name.value}-{REFERENCE_IMAGE_FILENAME}"
ref_image = open_acquisition(os.path.join(feature.path, filename))[0].getData()
if ref_image is None:
raise ValueError("Reference image not found.")
pixel_size = ref_image.metadata[model.MD_PIXEL_SIZE]
fov = pixel_size[0] * ref_image.shape[1]
self.ion_beam.horizontalFoV.value = fov
self.ion_beam.resolution.value = ref_image.shape[::-1]

# beam shift alignment
self._future.running_subf = acquire([self.fib_stream])
data, _ = self._future.running_subf.result()
new_image = data[0]

# roll data by a random amount (for simulation)
# import random
# x, y = random.randint(0, 100), random.randint(0, 100)
# new_image = numpy.roll(new_image, [x, y], axis=[0, 1])
# logging.debug(f"Shifted image by {x}, {y} pixels")

align_filename = self.get_filename(feature, "Pre-Alignment-FIB")
self._exporter.export(align_filename, new_image)

align_reference_image(ref_image, new_image, scanner=self.ion_beam)

# save post-alignment image
self._future.running_subf = acquire([self.fib_stream])
data, _ = self._future.running_subf.result()
new_image = data[0]

align_filename = self.get_filename(feature, "Post-Alignment-FIB")
self._exporter.export(align_filename, new_image)

def _acquire_reference_images(self, feature: CryoFeature) -> None:
self.check_cancelled()

self._future.msg = f"{feature.name.value}: Acquiring Reference Images"
self._future.set_progress()

# acquire images
self._future.running_subf = acquire([self.sem_stream, self.fib_stream])
data, ex = self._future.running_subf.result()
sem_image, fib_image = data

# save images
sem_filename = self.get_filename(feature, "Finished-SEM")
fib_filename = self.get_filename(feature, "Finished-FIB")
self._exporter.export(sem_filename, sem_image)
self._exporter.export(fib_filename, fib_image)

def get_filename(self, feature: CryoFeature, basename: str) -> str:
"""Get a unique filename for the given feature and basename.
:param feature: The feature to get the filename for.
:param basename: The basename of the filename.
:return: The full filename."""
ts = datetime.now().strftime("%Y-%m-%d-%H-%M-%S")
filename = f"{self._prefix}-{basename}-{ts}.ome.tiff".replace(" ", "-")
return os.path.join(os.path.join(feature.path, filename))

def run_automated_milling(features: List[CryoFeature],
stage: model.Actuator,
sem_stream: SEMStream,
fib_stream: FIBStream,
task_list: List[MillingWorkflowTask],
) -> Future:
"""
Automatically mill and image a list of features.
:return: ProgressiveFuture
"""
# Create a progressive future with running sub future
future = model.ProgressiveFuture()
# create automated milling task
amm = AutomatedMillingManager(
future=future,
stage=stage,
sem_stream=sem_stream,
fib_stream=fib_stream,
task_list=task_list,
features=features,
)
# add the ability of cancelling the future during execution
future.task_canceller = amm.cancel

# set the progress of the future
total_duration = len(task_list) * len(features) * 30
future.set_end_time(time.time() + total_duration) # TODO: get proper time estimate from openfibsem

# assign the acquisition task to the future
executeAsyncTask(future, amm.run)

return future
2 changes: 1 addition & 1 deletion src/odemis/acq/milling/test/millmng_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
FEATURE_DEACTIVE,
)
from odemis.acq.milling.tasks import load_milling_tasks
from odemis.acq.millmng2 import MillingWorkflowTask, run_automated_milling, status_map
from odemis.acq.milling.millmng import MillingWorkflowTask, run_automated_milling, status_map
from odemis.acq.move import (
FM_IMAGING,
GRID_1,
Expand Down
5 changes: 0 additions & 5 deletions src/odemis/acq/move.py
Original file line number Diff line number Diff line change
Expand Up @@ -323,11 +323,6 @@ def __init__(self, microscope):
self.current_posture = model.VigilantAttribute(UNKNOWN)
self.stage.position.subscribe(self._update_posture, init=True)

# feature flags
self.use_3d_transforms: bool = USE_3D_TRANSFORMS
self.use_scan_rotation: bool = USE_SCAN_ROTATION
self.use_linked_sem_focus_compensation: bool = USE_LINKED_SEM_FOCUS_COMPENSATION

def getCurrentPostureLabel(self, pos: Dict[str, float] = None) -> int:
"""
Detects the current stage position of meteor
Expand Down

0 comments on commit 9e9e105

Please sign in to comment.