Skip to content

Commit

Permalink
[fix] SpectrumSettingsStream with light component didn't turn light o…
Browse files Browse the repository at this point in the history
…n during acquisition

It would turn on the light during live stream, but the actual
acquisition, done in the MDStream wouldn't turn on the light.
=> use (un)linkHwVAs() to turn on/off the light. These functions
are used (automaticall) both in live and acquisition modes.
  • Loading branch information
pieleric committed Nov 29, 2024
1 parent a07eed2 commit 02ddc71
Show file tree
Hide file tree
Showing 2 changed files with 143 additions and 20 deletions.
45 changes: 25 additions & 20 deletions src/odemis/acq/stream/_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import time

import numbers
from typing import Tuple, Any, Dict
from typing import Tuple, Any, Dict, Optional

import numpy

Expand Down Expand Up @@ -723,8 +723,6 @@ def __init__(self, name, detector, dataflow, emitter, light=None, **kwargs):
# Contains one 1D spectrum (start with an empty array)
self.image.value = model.DataArray([])

# TODO: grating/cw as VAs (from the spectrometer)

if self.light:
# Current channel index to be used for channel's power update
self._channel_idx = 0
Expand All @@ -735,33 +733,40 @@ def __init__(self, name, detector, dataflow, emitter, light=None, **kwargs):
unit=self.light.power.unit)
self.power.subscribe(self._onPower)

def _onPower(self, value):
def _onPower(self, power: float):
"""
Update the light power with the current channel value if the stream is active/playing.
:param value (float): current channel value
:param power: new light power
"""
if self.is_active.value:
pwr = list(self.light.power.range[0])
pwr[self._channel_idx] = value
self.light.power.value = pwr
self._setup_light(power)

def _onActive(self, active):
if active:
if self.light:
# Call _onPower to update the power of the light
self._onPower(self.power.value)
super()._onActive(active)
else:
super()._onActive(active)
# Overrides the (un)linkHwVAs methods, so that the MDStream also turn on/off the light during synchronized acquisition
def _linkHwVAs(self):
super()._linkHwVAs()
if self.light:
self._setup_light()

def _unlinkHwVAs(self):
if self.light:
self._stop_light()
super()._unlinkHwVAs()

def _stop_light(self):
def _setup_light(self, ch_power: Optional[float] = None):
"""
Ensures the light is turned off (temporarily)
Sets the light power to the current value of the power VA.
:param ch_power: The power to set the light to. If None, the current value of the power VA is used.
"""
if self.light is None:
return
if ch_power is None:
ch_power = self.power.value
pwr = list(self.light.power.range[0]) # Other channels should be off
pwr[self._channel_idx] = ch_power
self.light.power.value = pwr

def _stop_light(self):
"""
Ensures the light is turned off
"""
# set the light power to the minimum of the range, in most cases this is 0.0
self.light.power.value = self.light.power.range[0]

Expand Down
118 changes: 118 additions & 0 deletions src/odemis/acq/test/stream_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
SPARC2STREAK_CONFIG = CONFIG_PATH + "sim/sparc2-streakcam-sim.odm.yaml"
SPARC2_4SPEC_CONFIG = CONFIG_PATH + "sim/sparc2-4spec-sim.odm.yaml"
SPARC2_INDE_EBIC_CONFIG = CONFIG_PATH + "sim/sparc2-independent-ebic-sim.odm.yaml"
SPARC2_FPLM_CONFIG = CONFIG_PATH + "sim/sparc2-fplm-sim.odm.yaml"
TIME_CORRELATOR_CONFIG = CONFIG_PATH + "sim/sparc2-time-correlator-sim.odm.yaml"
SPARC2_HWSYNC_CONFIG = CONFIG_PATH + "sim/sparc2-nidaq-sim.odm.yaml"

Expand Down Expand Up @@ -4011,6 +4012,123 @@ def test_ar_acq_integrated_images_leech(self):
self.assertGreaterEqual(ar_drift.shape[-4], 2)


class SPARC2TestCaseFPLM(unittest.TestCase):
"""
This test case is specifically targeting the FPLM systems, with PL acquisition
"""
@classmethod
def setUpClass(cls):
testing.start_backend(SPARC2_FPLM_CONFIG)

# Find components
cls.microscope = model.getMicroscope()
cls.ebeam = model.getComponent(role="e-beam")
cls.sed = model.getComponent(role="se-detector")
cls.spec = model.getComponent(role="spectrometer")
cls.spgp = model.getComponent(role="spectrograph")
cls.filter = model.getComponent(role="filter")
cls.light = model.getComponent(role="light")

def setUp(self):
self._images = []

def _on_image(self, im):
self._images.append(im)

def test_spec_light_ss(self):
""" Test SpectrumSettingsStream with a light source """
# Create the stream
specs = stream.SpectrumSettingsStream("test",
self.spec, self.spec.data, self.ebeam,
light=self.light,
detvas={"exposureTime", "readoutRate", "binning", "resolution"})
specs.image.subscribe(self._on_image)

# shouldn't affect
specs.roi.value = (0.15, 0.6, 0.8, 0.8)
specs.repetition.value = (5, 6)

specs.detExposureTime.value = 0.3 # s

# Light has only one channel, so it's easy to handle
self.assertEqual(self.light.power.value, [0]) # Should start off
light_pwr = self.light.power.range[1][0] # max
specs.power.value = light_pwr

# Start acquisition
specs.should_update.value = True
specs.is_active.value = True

time.sleep(2)
# The light should be on
self.assertEqual(self.light.power.value, [light_pwr])

specs.is_active.value = False

self.assertGreater(len(self._images), 0, "No spectrum received after 2s")
self.assertIsInstance(self._images[0], model.DataArray)
# .image should be a 1D spectrum
self.assertEqual(self._images[0].shape, (specs.detResolution.value[0],))

# The light should be off
self.assertEqual(self.light.power.value, [0])

specs.image.unsubscribe(self._on_image)

def test_acq_spec_light(self):
"""
Test acquisition for Spectrometer with input light
"""
# Create the stream
sems = stream.SEMStream("test sem", self.sed, self.sed.data, self.ebeam)
specs = stream.SpectrumSettingsStream("test spec", self.spec, self.spec.data, self.ebeam,
light=self.light,
detvas={"exposureTime", "readoutRate", "binning", "resolution"})
sps = stream.SEMSpectrumMDStream("test sem-spec", [sems, specs])

specs.roi.value = (0.15, 0.6, 0.8, 0.8)

# Long acquisition (small rep to avoid being too long) > 2s
specs.detExposureTime.value = 0.3 # s
specs.repetition.value = (5, 6)
# exp_pos, exp_pxs, exp_res = self._roiToPhys(specs)

# Light has only one channel, so it's easy to handle
self.assertEqual(self.light.power.value, [0]) # Should start off
light_pwr = self.light.power.range[1][0] / 2 # half the power
specs.power.value = light_pwr

# Start acquisition
timeout = 1 + 1.5 * sps.estimateAcquisitionTime()
start = time.time()
f = sps.acquire()

time.sleep(2) # Wait long enough so that it really started
# The light should be on
self.assertEqual(self.light.power.value, [light_pwr])

# wait until it's over
data = f.result(timeout)
dur = time.time() - start
logging.debug("Acquisition took %g s", dur)
self.assertTrue(f.done())
self.assertEqual(len(data), len(sps.raw))

# The light should be off
self.assertEqual(self.light.power.value, [0])

# There should be metadata about the light
sp_da = sps.raw[1]
sshape = sp_da.shape
self.assertEqual(len(sshape), 5)
self.assertGreater(sshape[0], 1) # should have at least 2 wavelengths
spec_md = sp_da.metadata
self.assertAlmostEqual(spec_md[model.MD_LIGHT_POWER], light_pwr)
self.assertIsInstance(spec_md[model.MD_IN_WL], tuple)
sp_dims = spec_md.get(model.MD_DIMS, "CTZYX"[-sp_da.ndim::])
self.assertEqual(sp_dims, "CTZYX")


class SPARC2TestCaseIndependentDetector(unittest.TestCase):
"""
This test case is specifically targeting the IndependentEBICStream
Expand Down

0 comments on commit 02ddc71

Please sign in to comment.