Skip to content

Commit

Permalink
Fixes for some more bugs with TaskFilter where the schedule was out-o…
Browse files Browse the repository at this point in the history
…f-sync because of using the last scan's start time rather than the true current RT, and where the list of ScanParameters was beheaded but the list of expected RTs was not so they were also out-of-sync. Also a bug introduced in the last commit where the first scan id was not based on the schedule's native ids but the others were. Some docstrings for FixedScansController.
  • Loading branch information
mcbrider5002 committed Feb 19, 2024
1 parent 99e4212 commit 4a08635
Show file tree
Hide file tree
Showing 3 changed files with 154 additions and 121 deletions.
243 changes: 137 additions & 106 deletions vimms/Controller/misc.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,25 +24,25 @@

class TaskFilter():
"""
Can be used with [vimms.Controller.misc.FixedScansController][] and its
subclasses to resynchronise scan RTs if they don't go according to
schedule. New scans are dynamically added or deleted if the current RT
is before or after the planned RT by a certain threshold (scaled to scan
lengths).
Can be used with [vimms.Controller.misc.FixedScansController][] and its
subclasses to resynchronise scan RTs if they don't go according to
schedule. New scans are dynamically added or deleted if the current RT
is before or after the planned RT by a certain threshold (scaled to scan
lengths).
"""
def __init__(self, ms1_length, ms2_length, skip_margin=0.5, add_margin=1.2):
"""
Create a new TaskFilter.
Args:
ms1_length: Expected length of MS1 scans.
ms2_length: Expected length of MS2 scans.
skip_margin: Cancel the current task if current RT is closer
to expected RT of the following task than skip_margin *
current_task_length.
add_margin: Add a filler scan if current RT is further away
from expected RT of the current task than add_margin *
min_task_length.
Create a new TaskFilter.
Args:
ms1_length: Expected length of MS1 scans.
ms2_length: Expected length of MS2 scans.
skip_margin: Cancel the current task if current RT is closer
to expected RT of the following task than skip_margin *
current_task_length.
add_margin: Add a filler scan if current RT is further away
from expected RT of the current task than add_margin *
min_task_length.
"""
self.ms1_length = ms1_length
self.ms2_length = ms2_length
Expand All @@ -66,45 +66,52 @@ def _find_nearest_ms2(task_idx, tasks):

return None

@staticmethod
def _make_ms2(task_idx, tasks, scan_id, precursor_id):
@classmethod
def _make_scan(cls, task, scan_id, precursor_id):
if(not task.get(ScanParameters.MS_LEVEL) in [1, 2]):
raise NotImplementedError(
f"""{cls} isn't sure how to copy a ScanParameters with
ms_level = {task.get(ScanParameters.MS_LEVEL)}"""
)

s = copy.deepcopy(task)
s.set(ScanParameters.SCAN_ID, scan_id)
if(s.get(ScanParameters.MS_LEVEL) == 2):
try:
#not sure there is any sensible way to change this without directly
#modifying internals, so here we defend against future changes in
#ScanParameters
for p in s.get(ScanParameters.PRECURSOR_MZ):
test = p.precursor_scan_id
except:
raise NotImplementedError(
f"""
ScanParameters doesn't have the expected internal structure
for precursor information. {cls} is probably about to
make a horrible mistake.
"""
)

for p in s.get(ScanParameters.PRECURSOR_MZ):
p.precursor_scan_id = precursor_id
return s

@classmethod
def _make_ms2(cls, task_idx, tasks, scan_id, precursor_id):
template_scan = TaskFilter._find_nearest_ms2(task_idx, tasks)

if(template_scan is None):
precursor_mz = 100.0
isolation_width = 1.0
else:
precursor_mz = template_scan.get(ScanParameters.PRECURSOR_MZ)[0].precursor_mz
isolation_width = template_scan.get(ScanParameters.ISOLATION_WIDTH)[0]

return get_dda_scan_param(
precursor_mz,
0.0,
precursor_id,
isolation_width,
0.0,
0.0,
scan_id=scan_id
)

@staticmethod
def _make_scan(task, scan_id, precursor_id):
if(task.get(ScanParameters.MS_LEVEL) == 1):
return get_default_scan_params(scan_id=scan_id)
elif(task.get(ScanParameters.MS_LEVEL) == 2):
#note that some input arguments for ScanParameters get coerced to a list
#so when we "copy" objects with a new constructor we have to undo that
return get_dda_scan_param(
task.get(ScanParameters.PRECURSOR_MZ)[0].precursor_mz,
0.0,
precursor_id,
task.get(ScanParameters.ISOLATION_WIDTH)[0],
0.0,
0.0,
precursor_mz=100.0,
intensity=0.0,
precursor_scan_id=precursor_id,
isolation_width=DEFAULT_ISOLATION_WIDTH,
mz_tol=0.0,
rt_tol=0.0,
scan_id=scan_id
)

raise NotImplementedError(f"MS_LEVEL: {task.get(ScanParameters.MS_LEVEL)}")
else:
return cls._make_scan(tasks[task_idx], scan_id, precursor_id)

def _get_task_length(self, task):
if(task.get(ScanParameters.MS_LEVEL) == 1):
Expand All @@ -116,20 +123,20 @@ def _get_task_length(self, task):

def get_task(self, scan, scan_id, precursor_id, task_idx, expected_rts, tasks):
"""
Gets the next task and updates the current task index for the
parent controller.
Gets the next task and updates the current task index for the
parent controller.
Args:
scan: Current scan from parent.
scan_id: ID of scan.
precursor_id: ID of last MS1.
task_idx: Current index in task queue.
expected_rts: Queue of expected RTs corresponding to tasks.
tasks: Full task queue.
Args:
scan: Current scan from parent.
scan_id: ID of scan.
precursor_id: ID of last MS1.
task_idx: Current index in task queue.
expected_rts: Queue of expected RTs corresponding to tasks.
tasks: Full task queue.
Returns: Tuple of new task index and next task.
Returns: Tuple of new task index and next task.
"""
actual_rt = scan.rt
actual_rt = scan.rt + scan.scan_duration
expected_rt = expected_rts[task_idx]
rt_dist = expected_rt - actual_rt

Expand All @@ -140,7 +147,7 @@ def get_task(self, scan, scan_id, precursor_id, task_idx, expected_rts, tasks):
else:
new_task = self._make_ms2(task_idx, tasks, scan_id, precursor_id)
else:
if(rt_dist > self.add_margin * self.ms1_length):
if(rt_dist > self.add_margin * self.ms2_length):
new_task = self._make_ms2(task_idx, tasks, scan_id, precursor_id)
else:
new_task = get_default_scan_params(scan_id=scan_id)
Expand All @@ -161,67 +168,79 @@ def get_task(self, scan, scan_id, precursor_id, task_idx, expected_rts, tasks):

class FixedScansController(Controller):
"""
A controller which takes a schedule of scans, converts them into
tasks in queue
A controller which takes a schedule of scans, and converts them into
tasks in a queue.
The base class for pre-scheduled controllers like
[vimms.Controller.misc.DsDAController][] and
[vimms.Controller.misc.MatchingController][].
"""

def __init__(self, schedule=None, advanced_params=None, expected_rts=None, task_filter=None):
"""
Creates a FixedScansController that accepts a list of schedule of
scan parameters
:param schedule: a list of ScanParameter objects
:param advanced_params: mass spec advanced parameters, if any
:param expected_rts: gives times tasks are expected to appear at
needed to update tasks dynamically with task_filter
:param task_filter: object that examines the task list and adds or deletes
tasks to ensure schedule remains in sync with the actual
RT
Create a FixedScansController.
Args:
schedule: List of [vimms.Common.ScanParameter][] objects.
advanced_params: Instance of [vimms.Controller.base.AdvancedParams][].
expected_rts: List of expected RTs for tasks with indices corresponding
to schedule. Only needed if e.g. resynchronising RTs using
task_filter.
task_filter: Object that dynamically returns tasks on request. For
example [vimms.Controller.misc.TaskFilter][] can be used to
resynchronise unexpected RTs with the expected ones in the schedule.
"""
super().__init__(advanced_params=advanced_params)
self.tasks = None
self.initial_task = None
self.task_idx = 0
self.task_idx = 1 #First scan of schedule is always run
self.expected_rts = expected_rts
self.task_filter = task_filter

if schedule is not None and len(schedule) > 0:
# if schedule is provided, set it
self.set_tasks(schedule)
self.scan_id = INITIAL_SCAN_ID + 1 # MS obj. always starts with an MS1
self.precursor_id = INITIAL_SCAN_ID
self.scan_id = self.initial_task.get(ScanParameters.SCAN_ID) + 1
self.precursor_id = (
self.scan_id - 1
if self.initial_task.get(ScanParameters.MS_LEVEL) == 1
else None
)

def get_initial_tasks(self):
"""
Returns all the remaining scan parameter objects to be pushed to
the mass spec queue
:return: all the remaining tasks
Returns all initial tasks for the mass spec queue.
Returns: List of tasks.
"""
# the remaining scan parameters in the schedule must have been set
assert self.tasks is not None
if(self.task_filter is None):
return self.tasks
return self.tasks[1:]
else:
return []

def get_initial_scan_params(self):
"""
Returns the initial scan parameter object to send when
acquisition starts
:return: the initial task
Returns: The initial task.
"""
# the first scan parameters in the schedule must have been set
assert self.initial_task is not None
return self.initial_task

def set_tasks(self, schedule):
"""
Set the fixed schedule of tasks in this controller
:param schedule: a list of scan parameter objects
:return: None
Set a new schedule for this controller.
Args:
schedule: A list of [vimms.Common.ScanParameter][].
"""
assert isinstance(schedule, list)
self.initial_task = schedule[0] # used for sending the first scan
self.tasks = schedule[1:] # used for sending all the other scans
self.tasks = schedule # used for sending all the other scans

def handle_scan(self, scan, current_size, pending_size):
# simply record every scan that we've received, but return no new tasks
Expand Down Expand Up @@ -252,29 +271,29 @@ def _process_scan(self, scan):

class DsDAController(WrapperController):
"""
A controller which allows running the DsDA (Dataset-Dependent Acquisition)
method.
See the original publication for a description of DsDA:
Broeckling, Hoyes, et al. "Comprehensive Tandem-Mass-Spectrometry coverage
of complex samples enabled by Data-Set-Dependent acquisition."
Analytical Chemistry. 90, 8020–8027 (2018).
A controller which allows running the DsDA (Dataset-Dependent Acquisition)
method.
See the original publication for a description of DsDA:
Broeckling, Hoyes, et al. "Comprehensive Tandem-Mass-Spectrometry coverage
of complex samples enabled by Data-Set-Dependent acquisition."
Analytical Chemistry. 90, 8020–8027 (2018).
"""

def __init__(self, dsda_state, mzml_name, advanced_params=None, task_filter=None):
"""
Initialise a new DsDAController instance.
Args:
dsda_state: An instance of [vimms.DsDA.DsDAState][], wrapping a
live R process running DsDA.
mzml_name: The name of the .mzML file to write for this injection.
advanced_params: a [vimms.Controller.base.AdvancedParams][] object
that contains advanced parameters to control the mass spec.
See [vimms.Controller.base.AdvancedParams][] for defaults.
task_filter: Object that examines the task list and adds or deletes
tasks to ensure schedule remains in sync with the actual RT.
Initialise a new DsDAController instance.
Args:
dsda_state: An instance of [vimms.DsDA.DsDAState][], wrapping a
live R process running DsDA.
mzml_name: The name of the .mzML file to write for this injection.
advanced_params: a [vimms.Controller.base.AdvancedParams][] object
that contains advanced parameters to control the mass spec.
See [vimms.Controller.base.AdvancedParams][] for defaults.
task_filter: Object that examines the task list and adds or deletes
tasks to ensure schedule remains in sync with the actual RT.
"""
self.dsda_state = dsda_state
self.mzml_name = mzml_name
Expand Down Expand Up @@ -583,11 +602,23 @@ def from_fullscan(ms2planner_dir,

class MatchingController(FixedScansController):
"""
A pre-scheduled controller that performs maximum matching to obtain the largest
coverage
A pre-scheduled controller that executes a scan queue planned by
a maximum bipartite matching from [vimms.Matching][].
"""
@classmethod
def from_matching(cls, matching, isolation_width, advanced_params=None, task_filter=None):
"""
Construct a list of MatchingControllers (one for each injection)
from a potentially multi-injection matching.
Args:
matching: Instance of [vimms.Matching.Matching][].
isolation_width: Isolation width in Daltons.
advanced_params: Instance of [vimms.Controller.base.AdvancedParams][].
task_filter: Object that dynamically returns tasks on request. For
example [vimms.Controller.misc.TaskFilter][] can be used to
resynchronise unexpected RTs with the expected ones in the schedule.
"""
return [
MatchingController(
schedule=schedule,
Expand Down
5 changes: 3 additions & 2 deletions vimms/DsDA.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ def dsda_get_scan_params(schedule_file, template_file, isolation_width, mz_tol,
Args:
schedule_file: Path to DsDA output file to be read.
template_file: Path to file containing chedule of scan times for the
template_file: Path to file containing schedule of scan times for the
DsDA series.
isolation_width: Isolation width for scans to use.
mz_tol: m/z tolerance for scans to use.
Expand All @@ -169,10 +169,11 @@ def dsda_get_scan_params(schedule_file, template_file, isolation_width, mz_tol,
for i in range(schedule.shape[0]):
if types[i] != 'msms':
precursor_scan_id = scan_id
scan_params = get_default_scan_params(scan_id=precursor_scan_id)
scan_params = get_default_scan_params(scan_id=scan_id)
else:
assert not precursor_scan_id is None
mz = 100 if np.isnan(masses[i]) else masses[i]
#TODO: Should mz_tol and rt_tol be here???
scan_params = get_dda_scan_param(mz, 0.0, precursor_scan_id,
isolation_width, mz_tol, rt_tol,
scan_id=scan_id)
Expand Down
Loading

0 comments on commit 4a08635

Please sign in to comment.