Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[TRCL-570][feat] introduce startScan Event to synchronize CCD with e-beam during SPARC acq #3027

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 33 additions & 32 deletions src/odemis/acq/stream/_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ def __init__(self, name, streams):
self._current_scan_area = None # l,t,r,b (int)

# Start threading event for live update overlay
self._live_update_period = 2
self._live_update_period = 2 # s
self._im_needs_recompute = threading.Event()
self._init_thread(self._live_update_period)

Expand Down Expand Up @@ -1083,7 +1083,7 @@ def __init__(self, name, streams):
self._sccd = s1
self._ccd = s1._detector
self._ccd_df = s1._dataflow
self._trigger = self._ccd.softwareTrigger
self._trigger = self._emitter.startScan # to acquire a CCD image every time the SEM starts a new scan
self._ccd_idx = len(self._streams) - 1 # optical detector is always last in streams

def _supports_hw_sync(self):
Expand Down Expand Up @@ -1653,11 +1653,6 @@ def _runAcquisitionEbeam(self, future):
# retrigger, or unsynchronise/resynchronise just before the end of
# last scan).

# prepare detector
self._ccd_df.synchronizedOn(self._trigger)
# subscribe to last entry in _subscribers (optical detector)
self._ccd_df.subscribe(self._subscribers[self._ccd_idx])

# Instead of subscribing/unsubscribing to the SEM for each pixel,
# we've tried to keep subscribed, but request to be unsynchronised/
# synchronised. However, synchronizing doesn't cancel the current
Expand Down Expand Up @@ -1699,6 +1694,12 @@ def _runAcquisitionEbeam(self, future):
self._emitter.resolution.value,
self._emitter.scale.value)

# Prepare CCD: acquire one frame every time the SEM starts scanning.
# The SEM may scan multiple times for each CCD frame.
self._ccd_df.synchronizedOn(self._trigger)
# Get the CCD ready to acquire
self._ccd_df.subscribe(self._subscribers[self._ccd_idx])

last_ccd_update = 0
start_t = time.time()
n = 0 # number of images acquired so far
Expand Down Expand Up @@ -1887,26 +1888,10 @@ def _acquireImage(self, n, px_idx, img_time, sem_time,
raise CancelledError()

# subscribe to _subscribers
# As soon as the e-beam start scanning (which can take a couple of ms), the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# As soon as the e-beam start scanning (which can take a couple of ms), the
# As soon as the e-beam starts scanning (which can take a couple of ms), the

# startScan event is sent, which triggers the acquisition of one CCD frame.
for s, sub in zip(self._streams[:-1], self._subscribers[:-1]):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you comment on why the last element is excluded?

s._dataflow.subscribe(sub)
# TODO: in theory (aka in a perfect world), the ebeam would immediately
# be at the requested position after the subscription starts. However,
# that's not exactly the case due to:
# * physics limits the speed of voltage change in the ebeam column,
# so it takes the "settle time" before the beam is at the right
# place (in the order of 10 µs).
# * the (odemis) driver is asynchronous, and between the moment it
# receives the request to start and the actual moment it asks the
# hardware to change voltages, several ms might have passed.
# One thing that would help is to not park the e-beam between each
# spot. This way, the ebeam would reach the position much quicker,
# and if it's not yet at the right place, it's still not that far.
# In the meantime, waiting a tiny bit ensures the CCD receives the
# right data.
time.sleep(5e-3) # give more chances spot has been already processed

# send event to detector to acquire one image
self._trigger.notify()

# wait for detector to acquire image
timedout = self._waitForImage(img_time)
Expand Down Expand Up @@ -1971,6 +1956,11 @@ def _acquireImage(self, n, px_idx, img_time, sem_time,
leech_nimg[li] -= 1
if leech_nimg[li] == 0:
try:
# Temporarily switch the CCD to a different event trigger, so that it
# doesn't get triggered while the leech is running (because it could use the
# e-beam, which would send a startScan event)
self._ccd_df.synchronizedOn(self._ccd.softwareTrigger)

nimg = l.next([d[-1] for d in self._acq_data])
logging.debug("Ran leech %s successfully. Will run next leech after %s acquisitions.", l, nimg)
except Exception:
Expand All @@ -1980,6 +1970,9 @@ def _acquireImage(self, n, px_idx, img_time, sem_time,
if self._acq_state == CANCELLED:
raise CancelledError()

# re-use the real trigger
self._ccd_df.synchronizedOn(self._trigger)

# Since we reached this point means everything went fine, so
# no need to retry
break
Expand Down Expand Up @@ -2008,9 +2001,9 @@ def _runAcquisitionScanStage(self, future):
# The idea of the acquiring with a scan stage:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# The idea of the acquiring with a scan stage:
# The idea of acquisition with a scan stage:

# (Note we expect the scan stage to be about at the center of its range)
# * Move the ebeam to 0, 0 (center), for the best image quality
# * Start CCD acquisition with software synchronisation
# * Start CCD acquisition with synchronisation on e-beam startScan
# * Move to next position with the stage and wait for it
# * Start SED acquisition and trigger CCD
# * Start SED acquisition -> startScan event triggers CCD
# * Wait for the CCD/SED data
# * Repeat until all the points have been scanned
# * Move back the stage to center in case of an 'independent' stage
Expand Down Expand Up @@ -2119,12 +2112,11 @@ def _runAcquisitionScanStage(self, future):
if self._acq_state == CANCELLED:
raise CancelledError()

# Start e-beam scan. As soon as it really start, a startScan event is sent, which
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# Start e-beam scan. As soon as it really start, a startScan event is sent, which
# Start e-beam scan. As soon as it really starts, a startScan event is sent, which

# triggers the CCD acquisition.
for s, sub in zip(self._streams[:-1], self._subscribers[:-1]):
s._dataflow.subscribe(sub)

time.sleep(5e-3) # give more chances spot has been already processed
self._trigger.notify()

# wait for detector to acquire image
timedout = self._waitForImage(px_time)

Expand Down Expand Up @@ -2210,6 +2202,11 @@ def _runAcquisitionScanStage(self, future):
sstage.moveAbsSync(orig_spos)
prev_spos.update(orig_spos)
try:
# Temporarily switch the CCD to a different event trigger, so that it
# doesn't get triggered while the leech is running (because it could use the
# e-beam, which would send a startScan event)
self._ccd_df.synchronizedOn(self._ccd.softwareTrigger)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe an assert line can be used to check that the notification boolean value of startscan is False before reusing the real trigger? Same for _acquireImage


np = l.next([d[-1] for d in self._acq_data])
except Exception:
logging.exception("Leech %s failed, will retry next pixel", l)
Expand All @@ -2218,6 +2215,9 @@ def _runAcquisitionScanStage(self, future):
if self._acq_state == CANCELLED:
raise CancelledError()

# re-use the real trigger
self._ccd_df.synchronizedOn(self._trigger)

for i, das in enumerate(self._acq_data):
self._assembleLiveData(i, das[-1], px_idx, cor_pos, rep, 0)

Expand Down Expand Up @@ -2513,6 +2513,7 @@ def _runAcquisition(self, future):
self._df0.synchronizedOn(self._trigger)
for s, sub in zip(self._streams, self._subscribers):
s._dataflow.subscribe(sub)

start = time.time()
self._acq_min_date = start
self._trigger.notify()
Expand All @@ -2529,8 +2530,8 @@ def _runAcquisition(self, future):
for i, s in enumerate(self._streams):
timeout = max(0.1, max_end_t - time.time())
if not self._acq_complete[i].wait(timeout):
raise TimeoutError("Acquisition of repetition stream for frame %s timed out after %g s"
% (self._emitter.translation.value, time.time() - max_end_t))
raise TimeoutError("Acquisition of repetition stream at pos %s timed out after %g s"
% (self._emitter.translation.value, time.time() - start))
if self._acq_state == CANCELLED:
raise CancelledError()
s._dataflow.unsubscribe(self._subscribers[i])
Expand Down
127 changes: 26 additions & 101 deletions src/odemis/driver/semcomedi.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,9 +293,6 @@ def __init__(self, name, role, children, device, daemon=None, **kwargs):
raise ValueError("SEMComedi device '%s' was not given a 'scanner' child" % device)
self._scanner = Scanner(parent=self, daemon=daemon, **ckwargs)
self.children.value.add(self._scanner)
# for scanner.newPixel
self._new_position_thread = None
self._new_position_thread_pipe = [] # list to communicate with the current thread

self._acquisition_thread = None

Expand Down Expand Up @@ -987,10 +984,7 @@ def write_read_2d_data_raw(self, wchannels, wranges, rchannels, rranges,
# We write at the given period, and read "osr" samples for each pixel
nrchans = len(rchannels)

if dpr > 1 or (self._scanner.newPixel.hasListeners() and period >= 1e-3):
# if the newPixel event is used, prefer the per pixel write/read
# as it's much more precise (albeit a bit slower). It just needs to
# not be too costly (1 ms should be higher than the setup cost).
if dpr > 1:
force_per_pixel = True
else:
force_per_pixel = False
Expand Down Expand Up @@ -1033,11 +1027,6 @@ def _write_read_2d_lines(self, wchannels, wranges, rchannels, rranges,
Implementation of write_read_2d_data_raw by reading the input data n
lines at a time.
"""
if self._scanner.newPixel.hasListeners() and margin > 0:
# we don't support margin detection on multiple lines for
# newPixel trigger.
maxlines = 1

logging.debug(u"Reading %d lines at a time: %d samples/read every %g µs",
maxlines, maxlines * data.shape[1] * osr * len(rchannels),
period * 1e6)
Expand Down Expand Up @@ -1243,19 +1232,11 @@ def _fake_write_read_raw_one_cmd(self, wchannels, wranges, rchannels, rranges,
stop_arg=nrscans)
start = time.time()

np_to_report = nwscans - settling_samples
shift_report = settling_samples
if settling_samples == 0: # indicate a new ebeam position
self._scanner.newPixel.notify()
np_to_report -= 1
shift_report += 1
self._scanner.startScan.notify() # Special event that will only actually notify on the first call

# run the commands
self._reader.run()
self._writer.run()
self._start_new_position_notifier(np_to_report,
start + shift_report * period,
period)

timeout = expected_time * 1.10 + 0.1 # s == expected time + 10% + 0.1s
logging.debug("Waiting %g s for the acquisition to finish", timeout)
Expand Down Expand Up @@ -1357,22 +1338,11 @@ def _write_read_raw_one_cmd(self, wchannels, wranges, rchannels, rranges,
comedi.internal_trigger(self._device, self._ao_subdevice, self._ao_trig)

comedi.internal_trigger(self._device, self._ai_subdevice, 0)
start = time.time()

np_to_report = nwscans - settling_samples
shift_report = settling_samples
if settling_samples == 0:
# no margin => indicate a new ebeam position right now
self._scanner.newPixel.notify()
np_to_report -= 1
shift_report += 1
self._scanner.startScan.notify() # Special event that will only actually notify on the first call

self._reader.run()
if nwscans != 1:
self._writer.run()
self._start_new_position_notifier(np_to_report,
start + shift_report * period,
period)

timeout = expected_time * 1.10 + 0.1 # s == expected time + 10% + 0.1s
logging.debug("Waiting %g s for the acquisition to finish", timeout)
Expand Down Expand Up @@ -1554,68 +1524,6 @@ def _write_count_raw_one_cmd(self, wchannels, wranges, counter,
logging.debug("Counter sync read after %g s", time.time() - start)
return rbuf

def _start_new_position_notifier(self, n, start, period):
"""
Notify the newPixel Event n times with the given period.
n (0 <= int): number of event notifications
start (float): time for the first event (should be in the future)
period (float): period between two events
Note: this is used to emulate an actual ebeam change of position when
the hardware is requested to move the ebeam at multiple positions in a
row. Do not expect a precision better than 10us.
Note 2: this method returns immediately (and the emulation is run in a
separate thread).
"""
# no need if no one's listening
if not self._scanner.newPixel.hasListeners():
return

if n <= 0:
return

if period < 10e-6:
# don't even try: that's the time it'd take to have just one loop
# doing nothing
logging.error(u"Cannot generate newPixel events at such a "
u"small period of %s µs", period * 1e6)
return

self._new_position_thread_pipe = []
self._new_position_thread = threading.Thread(
target=self._notify_new_position,
args=(n, start, period, self._new_position_thread_pipe),
name="SEM new position notifier")

self._new_position_thread.start()

def _notify_new_position(self, n, start, period, pipe):
"""
The thread content
"""
trigger = 0
failures = 0
for i in range(n):
now = time.time()
trigger += period # accumulation error should be small
left = start - now + trigger
if left > 0:
if left > 10e-6: # TODO: if left < 1 ms => use usleep or nsleep
time.sleep(left)
else:
failures += 1
if pipe: # put anything in the pipe and it will mean it has to stop
logging.debug("npnotifier received cancel message")
return
self._scanner.newPixel.notify()

if failures:
logging.warning(u"Failed to trigger newPixel in time %d times, "
u"last trigger was %g µs late.", failures, -left * 1e6)

def _cancel_new_position_notifier(self):
logging.debug("cancelling npnotifier")
self._new_position_thread_pipe.append(True) # means it has to stop

def start_acquire(self, detector):
"""
Start acquiring images on the given detector (i.e., input channel).
Expand Down Expand Up @@ -1744,6 +1652,8 @@ def _acquisition_run(self):
self.set_to_resting_position()
# wait until something new comes in
self._check_cmd_q(block=True)

self._scanner.startScan.clear() # Get ready for the next scan
except CancelledError:
logging.info("Acquisition threading terminated on request")
except Exception:
Expand All @@ -1755,6 +1665,7 @@ def _acquisition_run(self):
except comedi.ComediError:
# can happen if the driver already terminated
pass
self._scanner.startScan.clear() # Get ready for the next scan
logging.info("Acquisition thread closed")
self._acquisition_thread = None

Expand All @@ -1766,7 +1677,6 @@ def _req_stop_acquisition(self):
# So it's protected with the init of read/write and set_to_resting_position
with self._acquisition_init_lock:
self._acquisition_must_stop.set()
self._cancel_new_position_notifier()
# Cancelling parts which are not running is a no-op
self._writer.cancel()
self._reader.cancel()
Expand Down Expand Up @@ -2611,6 +2521,23 @@ def cancel(self):
self._must_stop.set()


class EventOnce(model.Event):
"""
Special Event class which passes the event only once to the listener, until it's reset.
"""
def __init__(self):
super().__init__()
self._notified = False

def notify(self):
if not self._notified:
self._notified = True
super().notify()

def clear(self):
self._notified = False


class Scanner(model.Emitter):
"""
Represents the e-beam scanner
Expand Down Expand Up @@ -2761,6 +2688,9 @@ def __init__(self, name, role, parent, channels, limits, settle_time,
t.start()
self.indicate_scan_state(False)

# Event which is triggered at the beginning of the first frame of a scan
self.startScan = EventOnce()

# In theory the maximum resolution depends on the X/Y ranges, the actual
# ranges that can be used and the maxdata. It also depends on the noise
# on the scanning cable, and the scanning precision of the SEM.
Expand Down Expand Up @@ -2837,11 +2767,6 @@ def __init__(self, name, role, parent, channels, limits, settle_time,
self.dwellTime = model.FloatContinuous(min_dt, range_dwell,
unit="s", setter=self._setDwellTime)

# event to allow another component to synchronize on the beginning of
# a pixel position. Only sent during an actual pixel of a scan, not for
# the beam settling time or when put to rest.
self.newPixel = model.Event()

self._prev_settings = [None, None, None, None] # resolution, scale, translation, margin
self._scan_array = None # last scan array computed

Expand Down
Loading
Loading