diff --git a/src/odemis/acq/stream/_helper.py b/src/odemis/acq/stream/_helper.py index a85ec2e91a..d9525e55f6 100644 --- a/src/odemis/acq/stream/_helper.py +++ b/src/odemis/acq/stream/_helper.py @@ -26,7 +26,7 @@ import time import numbers -from typing import Tuple, Any, Dict +from typing import Tuple, Any, Dict, Optional import numpy @@ -723,8 +723,6 @@ def __init__(self, name, detector, dataflow, emitter, light=None, **kwargs): # Contains one 1D spectrum (start with an empty array) self.image.value = model.DataArray([]) - # TODO: grating/cw as VAs (from the spectrometer) - if self.light: # Current channel index to be used for channel's power update self._channel_idx = 0 @@ -735,33 +733,40 @@ def __init__(self, name, detector, dataflow, emitter, light=None, **kwargs): unit=self.light.power.unit) self.power.subscribe(self._onPower) - def _onPower(self, value): + def _onPower(self, power: float): """ Update the light power with the current channel value if the stream is active/playing. - :param value (float): current channel value + :param power: new light power """ if self.is_active.value: - pwr = list(self.light.power.range[0]) - pwr[self._channel_idx] = value - self.light.power.value = pwr + self._setup_light(power) - def _onActive(self, active): - if active: - if self.light: - # Call _onPower to update the power of the light - self._onPower(self.power.value) - super()._onActive(active) - else: - super()._onActive(active) + # Overrides the (un)linkHwVAs methods, so that the MDStream also turn on/off the light during synchronized acquisition + def _linkHwVAs(self): + super()._linkHwVAs() + if self.light: + self._setup_light() + + def _unlinkHwVAs(self): + if self.light: self._stop_light() + super()._unlinkHwVAs() - def _stop_light(self): + def _setup_light(self, ch_power: Optional[float] = None): """ - Ensures the light is turned off (temporarily) + Sets the light power to the current value of the power VA. + :param ch_power: The power to set the light to. If None, the current value of the power VA is used. """ - if self.light is None: - return + if ch_power is None: + ch_power = self.power.value + pwr = list(self.light.power.range[0]) # Other channels should be off + pwr[self._channel_idx] = ch_power + self.light.power.value = pwr + def _stop_light(self): + """ + Ensures the light is turned off + """ # set the light power to the minimum of the range, in most cases this is 0.0 self.light.power.value = self.light.power.range[0] diff --git a/src/odemis/acq/test/stream_test.py b/src/odemis/acq/test/stream_test.py index 8f193bb5c2..7a0ccab386 100644 --- a/src/odemis/acq/test/stream_test.py +++ b/src/odemis/acq/test/stream_test.py @@ -59,6 +59,7 @@ SPARC2STREAK_CONFIG = CONFIG_PATH + "sim/sparc2-streakcam-sim.odm.yaml" SPARC2_4SPEC_CONFIG = CONFIG_PATH + "sim/sparc2-4spec-sim.odm.yaml" SPARC2_INDE_EBIC_CONFIG = CONFIG_PATH + "sim/sparc2-independent-ebic-sim.odm.yaml" +SPARC2_FPLM_CONFIG = CONFIG_PATH + "sim/sparc2-fplm-sim.odm.yaml" TIME_CORRELATOR_CONFIG = CONFIG_PATH + "sim/sparc2-time-correlator-sim.odm.yaml" SPARC2_HWSYNC_CONFIG = CONFIG_PATH + "sim/sparc2-nidaq-sim.odm.yaml" @@ -4011,6 +4012,123 @@ def test_ar_acq_integrated_images_leech(self): self.assertGreaterEqual(ar_drift.shape[-4], 2) +class SPARC2TestCaseFPLM(unittest.TestCase): + """ + This test case is specifically targeting the FPLM systems, with PL acquisition + """ + @classmethod + def setUpClass(cls): + testing.start_backend(SPARC2_FPLM_CONFIG) + + # Find components + cls.microscope = model.getMicroscope() + cls.ebeam = model.getComponent(role="e-beam") + cls.sed = model.getComponent(role="se-detector") + cls.spec = model.getComponent(role="spectrometer") + cls.spgp = model.getComponent(role="spectrograph") + cls.filter = model.getComponent(role="filter") + cls.light = model.getComponent(role="light") + + def setUp(self): + self._images = [] + + def _on_image(self, im): + self._images.append(im) + + def test_spec_light_ss(self): + """ Test SpectrumSettingsStream with a light source """ + # Create the stream + specs = stream.SpectrumSettingsStream("test", + self.spec, self.spec.data, self.ebeam, + light=self.light, + detvas={"exposureTime", "readoutRate", "binning", "resolution"}) + specs.image.subscribe(self._on_image) + + # shouldn't affect + specs.roi.value = (0.15, 0.6, 0.8, 0.8) + specs.repetition.value = (5, 6) + + specs.detExposureTime.value = 0.3 # s + + # Light has only one channel, so it's easy to handle + self.assertEqual(self.light.power.value, [0]) # Should start off + light_pwr = self.light.power.range[1][0] # max + specs.power.value = light_pwr + + # Start acquisition + specs.should_update.value = True + specs.is_active.value = True + + time.sleep(2) + # The light should be on + self.assertEqual(self.light.power.value, [light_pwr]) + + specs.is_active.value = False + + self.assertGreater(len(self._images), 0, "No spectrum received after 2s") + self.assertIsInstance(self._images[0], model.DataArray) + # .image should be a 1D spectrum + self.assertEqual(self._images[0].shape, (specs.detResolution.value[0],)) + + # The light should be off + self.assertEqual(self.light.power.value, [0]) + + specs.image.unsubscribe(self._on_image) + + def test_acq_spec_light(self): + """ + Test acquisition for Spectrometer with input light + """ + # Create the stream + sems = stream.SEMStream("test sem", self.sed, self.sed.data, self.ebeam) + specs = stream.SpectrumSettingsStream("test spec", self.spec, self.spec.data, self.ebeam, + light=self.light, + detvas={"exposureTime", "readoutRate", "binning", "resolution"}) + sps = stream.SEMSpectrumMDStream("test sem-spec", [sems, specs]) + + specs.roi.value = (0.15, 0.6, 0.8, 0.8) + + # Long acquisition (small rep to avoid being too long) > 2s + specs.detExposureTime.value = 0.3 # s + specs.repetition.value = (5, 6) + # exp_pos, exp_pxs, exp_res = self._roiToPhys(specs) + + # Light has only one channel, so it's easy to handle + self.assertEqual(self.light.power.value, [0]) # Should start off + light_pwr = self.light.power.range[1][0] / 2 # half the power + specs.power.value = light_pwr + + # Start acquisition + timeout = 1 + 1.5 * sps.estimateAcquisitionTime() + start = time.time() + f = sps.acquire() + + time.sleep(2) # Wait long enough so that it really started + # The light should be on + self.assertEqual(self.light.power.value, [light_pwr]) + + # wait until it's over + data = f.result(timeout) + dur = time.time() - start + logging.debug("Acquisition took %g s", dur) + self.assertTrue(f.done()) + self.assertEqual(len(data), len(sps.raw)) + + # The light should be off + self.assertEqual(self.light.power.value, [0]) + + # There should be metadata about the light + sp_da = sps.raw[1] + sshape = sp_da.shape + self.assertEqual(len(sshape), 5) + self.assertGreater(sshape[0], 1) # should have at least 2 wavelengths + spec_md = sp_da.metadata + self.assertAlmostEqual(spec_md[model.MD_LIGHT_POWER], light_pwr) + self.assertIsInstance(spec_md[model.MD_IN_WL], tuple) + sp_dims = spec_md.get(model.MD_DIMS, "CTZYX"[-sp_da.ndim::]) + self.assertEqual(sp_dims, "CTZYX") + + class SPARC2TestCaseIndependentDetector(unittest.TestCase): """ This test case is specifically targeting the IndependentEBICStream