Skip to content

Commit

Permalink
Merge pull request #2956 from pieleric/fix-spectrumsettingsstream-wit…
Browse files Browse the repository at this point in the history
…h-light-component-didn-t-turn-light-on-during-acquisition

[NSRPARMA-117] [fix] SpectrumSettingsStream with light component didn't turn light on during acquisition
  • Loading branch information
pieleric authored Dec 6, 2024
2 parents 1239dfa + 02ddc71 commit c51a12c
Show file tree
Hide file tree
Showing 3 changed files with 168 additions and 20 deletions.
25 changes: 25 additions & 0 deletions install/linux/usr/share/odemis/sim/sparc2-fplm-sim.odm.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,31 @@
affects: ["Camera", "Spectral Camera", "Spectrometer Vis-NIR", "Spectrometer IR"],
}

# Input laser controlled via a DAQ MCC device
"External Laser": {
class: pwrmccdaq.MCCDeviceLight,
role: light,
init: {
mcc_device: "fake",
ao_channels: [0],
do_channels: [7],
# 99% low, 25% low, centre, 25% high, 99% high wavelength in m
spectra: [[527.e-9, 531.e-9, 532.e-9, 533.e-9, 537.e-9]],
# Relation curve of voltage -> power, as linear segments
pwr_curve: [
{
# Voltage should be 0->5V, with max power specified as 100mW
0: 0, # V -> W
5: 0.1, # 100mW
},
],
# di_channels port B 8-15 -> pin 32-39
# specified as [name, TLL_HIGH]
di_channels: {14: ["interlockTriggered", False]},
},
affects: ["Camera", "Spectral Camera", "Spectrometer Vis-NIR", "Spectrometer IR"],
}

"Power Control Unit": {
class: powerctrl.PowerControlUnit,
role: "power-control",
Expand Down
45 changes: 25 additions & 20 deletions src/odemis/acq/stream/_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import time

import numbers
from typing import Tuple, Any, Dict
from typing import Tuple, Any, Dict, Optional

import numpy

Expand Down Expand Up @@ -723,8 +723,6 @@ def __init__(self, name, detector, dataflow, emitter, light=None, **kwargs):
# Contains one 1D spectrum (start with an empty array)
self.image.value = model.DataArray([])

# TODO: grating/cw as VAs (from the spectrometer)

if self.light:
# Current channel index to be used for channel's power update
self._channel_idx = 0
Expand All @@ -735,33 +733,40 @@ def __init__(self, name, detector, dataflow, emitter, light=None, **kwargs):
unit=self.light.power.unit)
self.power.subscribe(self._onPower)

def _onPower(self, value):
def _onPower(self, power: float):
"""
Update the light power with the current channel value if the stream is active/playing.
:param value (float): current channel value
:param power: new light power
"""
if self.is_active.value:
pwr = list(self.light.power.range[0])
pwr[self._channel_idx] = value
self.light.power.value = pwr
self._setup_light(power)

def _onActive(self, active):
if active:
if self.light:
# Call _onPower to update the power of the light
self._onPower(self.power.value)
super()._onActive(active)
else:
super()._onActive(active)
# Overrides the (un)linkHwVAs methods, so that the MDStream also turn on/off the light during synchronized acquisition
def _linkHwVAs(self):
super()._linkHwVAs()
if self.light:
self._setup_light()

def _unlinkHwVAs(self):
if self.light:
self._stop_light()
super()._unlinkHwVAs()

def _stop_light(self):
def _setup_light(self, ch_power: Optional[float] = None):
"""
Ensures the light is turned off (temporarily)
Sets the light power to the current value of the power VA.
:param ch_power: The power to set the light to. If None, the current value of the power VA is used.
"""
if self.light is None:
return
if ch_power is None:
ch_power = self.power.value
pwr = list(self.light.power.range[0]) # Other channels should be off
pwr[self._channel_idx] = ch_power
self.light.power.value = pwr

def _stop_light(self):
"""
Ensures the light is turned off
"""
# set the light power to the minimum of the range, in most cases this is 0.0
self.light.power.value = self.light.power.range[0]

Expand Down
118 changes: 118 additions & 0 deletions src/odemis/acq/test/stream_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
SPARC2STREAK_CONFIG = CONFIG_PATH + "sim/sparc2-streakcam-sim.odm.yaml"
SPARC2_4SPEC_CONFIG = CONFIG_PATH + "sim/sparc2-4spec-sim.odm.yaml"
SPARC2_INDE_EBIC_CONFIG = CONFIG_PATH + "sim/sparc2-independent-ebic-sim.odm.yaml"
SPARC2_FPLM_CONFIG = CONFIG_PATH + "sim/sparc2-fplm-sim.odm.yaml"
TIME_CORRELATOR_CONFIG = CONFIG_PATH + "sim/sparc2-time-correlator-sim.odm.yaml"
SPARC2_HWSYNC_CONFIG = CONFIG_PATH + "sim/sparc2-nidaq-sim.odm.yaml"

Expand Down Expand Up @@ -4011,6 +4012,123 @@ def test_ar_acq_integrated_images_leech(self):
self.assertGreaterEqual(ar_drift.shape[-4], 2)


class SPARC2TestCaseFPLM(unittest.TestCase):
"""
This test case is specifically targeting the FPLM systems, with PL acquisition
"""
@classmethod
def setUpClass(cls):
testing.start_backend(SPARC2_FPLM_CONFIG)

# Find components
cls.microscope = model.getMicroscope()
cls.ebeam = model.getComponent(role="e-beam")
cls.sed = model.getComponent(role="se-detector")
cls.spec = model.getComponent(role="spectrometer")
cls.spgp = model.getComponent(role="spectrograph")
cls.filter = model.getComponent(role="filter")
cls.light = model.getComponent(role="light")

def setUp(self):
self._images = []

def _on_image(self, im):
self._images.append(im)

def test_spec_light_ss(self):
""" Test SpectrumSettingsStream with a light source """
# Create the stream
specs = stream.SpectrumSettingsStream("test",
self.spec, self.spec.data, self.ebeam,
light=self.light,
detvas={"exposureTime", "readoutRate", "binning", "resolution"})
specs.image.subscribe(self._on_image)

# shouldn't affect
specs.roi.value = (0.15, 0.6, 0.8, 0.8)
specs.repetition.value = (5, 6)

specs.detExposureTime.value = 0.3 # s

# Light has only one channel, so it's easy to handle
self.assertEqual(self.light.power.value, [0]) # Should start off
light_pwr = self.light.power.range[1][0] # max
specs.power.value = light_pwr

# Start acquisition
specs.should_update.value = True
specs.is_active.value = True

time.sleep(2)
# The light should be on
self.assertEqual(self.light.power.value, [light_pwr])

specs.is_active.value = False

self.assertGreater(len(self._images), 0, "No spectrum received after 2s")
self.assertIsInstance(self._images[0], model.DataArray)
# .image should be a 1D spectrum
self.assertEqual(self._images[0].shape, (specs.detResolution.value[0],))

# The light should be off
self.assertEqual(self.light.power.value, [0])

specs.image.unsubscribe(self._on_image)

def test_acq_spec_light(self):
"""
Test acquisition for Spectrometer with input light
"""
# Create the stream
sems = stream.SEMStream("test sem", self.sed, self.sed.data, self.ebeam)
specs = stream.SpectrumSettingsStream("test spec", self.spec, self.spec.data, self.ebeam,
light=self.light,
detvas={"exposureTime", "readoutRate", "binning", "resolution"})
sps = stream.SEMSpectrumMDStream("test sem-spec", [sems, specs])

specs.roi.value = (0.15, 0.6, 0.8, 0.8)

# Long acquisition (small rep to avoid being too long) > 2s
specs.detExposureTime.value = 0.3 # s
specs.repetition.value = (5, 6)
# exp_pos, exp_pxs, exp_res = self._roiToPhys(specs)

# Light has only one channel, so it's easy to handle
self.assertEqual(self.light.power.value, [0]) # Should start off
light_pwr = self.light.power.range[1][0] / 2 # half the power
specs.power.value = light_pwr

# Start acquisition
timeout = 1 + 1.5 * sps.estimateAcquisitionTime()
start = time.time()
f = sps.acquire()

time.sleep(2) # Wait long enough so that it really started
# The light should be on
self.assertEqual(self.light.power.value, [light_pwr])

# wait until it's over
data = f.result(timeout)
dur = time.time() - start
logging.debug("Acquisition took %g s", dur)
self.assertTrue(f.done())
self.assertEqual(len(data), len(sps.raw))

# The light should be off
self.assertEqual(self.light.power.value, [0])

# There should be metadata about the light
sp_da = sps.raw[1]
sshape = sp_da.shape
self.assertEqual(len(sshape), 5)
self.assertGreater(sshape[0], 1) # should have at least 2 wavelengths
spec_md = sp_da.metadata
self.assertAlmostEqual(spec_md[model.MD_LIGHT_POWER], light_pwr)
self.assertIsInstance(spec_md[model.MD_IN_WL], tuple)
sp_dims = spec_md.get(model.MD_DIMS, "CTZYX"[-sp_da.ndim::])
self.assertEqual(sp_dims, "CTZYX")


class SPARC2TestCaseIndependentDetector(unittest.TestCase):
"""
This test case is specifically targeting the IndependentEBICStream
Expand Down

0 comments on commit c51a12c

Please sign in to comment.