diff --git a/qupulse/program/linspace.py b/qupulse/program/linspace.py index 0d454c09..67f6ef99 100644 --- a/qupulse/program/linspace.py +++ b/qupulse/program/linspace.py @@ -9,7 +9,7 @@ from qupulse.parameter_scope import Scope, MappedScope, FrozenDict from qupulse.program import (ProgramBuilder, HardwareTime, HardwareVoltage, Waveform, RepetitionCount, TimeType, SimpleExpression) -from qupulse.program.waveforms import MultiChannelWaveform +from qupulse.program.waveforms import MultiChannelWaveform, TransformingWaveform, ConstantWaveform, SequenceWaveform # this resolution is used to unify increments # the increments themselves remain floats @@ -194,9 +194,26 @@ def with_repetition(self, repetition_count: RepetitionCount, def with_sequence(self, measurements: Optional[Sequence[MeasurementWindow]] = None) -> ContextManager['ProgramBuilder']: yield self - + + @contextlib.contextmanager def new_subprogram(self, global_transformation: 'Transformation' = None) -> ContextManager['ProgramBuilder']: - raise NotImplementedError('Not implemented yet (postponed)') + + inner_builder = LinSpaceBuilder(self._idx_to_name) + yield inner_builder + inner_program = inner_builder.to_program() + + if inner_program is not None: + #measurements not yet included + # measurements = [(name, begin, length) + # for name, (begins, lengths) in inner_program.get_measurement_windows().items() + # for begin, length in zip(begins, lengths)] + # self._top.add_measurements(measurements) + waveform = to_waveform(inner_program,self._idx_to_name) + if global_transformation is not None: + waveform = TransformingWaveform.from_transformation(waveform, global_transformation) + self.play_arbitrary_waveform(waveform) + + # raise NotImplementedError('Not implemented yet (postponed)') def with_iteration(self, index_name: str, rng: range, measurements: Optional[Sequence[MeasurementWindow]] = None) -> Iterable['ProgramBuilder']: @@ -408,3 +425,28 @@ def to_increment_commands(linspace_nodes: Sequence[LinSpaceNode]) -> List[Comman state.add_node(linspace_nodes) return state.commands + +def to_waveform(program: Sequence[LinSpaceNode], channels: Tuple[ChannelID]) -> Waveform: + + SUPPORTED_NODES = {LinSpaceArbitraryWaveform,LinSpaceHold} + SUPPORTED_INITIAL_NODES = {LinSpaceArbitraryWaveform,LinSpaceHold} + + assert all(type(node) in SUPPORTED_NODES for node in program), 'some node not (yet) supported for single waveform' + assert type(program[0]) in SUPPORTED_INITIAL_NODES, 'initial node not (yet) supported for single waveform' + + sequence = [] + for node in program: + if type(node)==LinSpaceArbitraryWaveform: + sequence.append(node.waveform) + elif type(node)==LinSpaceHold: + assert node.duration_factors is None, 'NotImplemented' + assert all(factor is None for factor in node.factors), 'NotImplemented' + #the channels should be sorted accordingly so we can do this. + sequence.append(ConstantWaveform.from_mapping(node.duration, dict(zip(channels, node.bases)))) + else: + raise NotImplementedError() + + sequenced_waveform = SequenceWaveform.from_sequence( + sequence + ) + return sequenced_waveform \ No newline at end of file diff --git a/qupulse/pulses/__init__.py b/qupulse/pulses/__init__.py index 4a8e1016..00199663 100644 --- a/qupulse/pulses/__init__.py +++ b/qupulse/pulses/__init__.py @@ -18,6 +18,8 @@ from qupulse.pulses.arithmetic_pulse_template import ArithmeticPulseTemplate as ArithmeticPT,\ ArithmeticAtomicPulseTemplate as ArithmeticAtomicPT from qupulse.pulses.time_reversal_pulse_template import TimeReversalPulseTemplate as TimeReversalPT +from qupulse.pulses.time_manipulation_pulse_template import TimeExtensionPulseTemplate as TimeExtensionPT,\ + SingleWFTimeExtensionPulseTemplate as SingleWFTimeExtensionPT import warnings with warnings.catch_warnings(): @@ -31,4 +33,4 @@ __all__ = ["FunctionPT", "ForLoopPT", "AtomicMultiChannelPT", "MappingPT", "RepetitionPT", "SequencePT", "TablePT", "PointPT", "ConstantPT", "AbstractPT", "ParallelConstantChannelPT", "ArithmeticPT", "ArithmeticAtomicPT", - "TimeReversalPT", "ParallelChannelPT"] + "TimeReversalPT", "ParallelChannelPT", "TimeExtensionPT", "SingleWFTimeExtensionPT"] diff --git a/qupulse/pulses/pulse_template.py b/qupulse/pulses/pulse_template.py index 97dd5cda..72803df3 100644 --- a/qupulse/pulses/pulse_template.py +++ b/qupulse/pulses/pulse_template.py @@ -373,6 +373,7 @@ def with_appended(self, *appended: 'PulseTemplate'): return self def pad_to(self, to_new_duration: Union[ExpressionLike, Callable[[Expression], ExpressionLike]], + as_single_wf: bool = True, pt_kwargs: Mapping[str, Any] = None) -> 'PulseTemplate': """Pad this pulse template to the given duration. The target duration can be numeric, symbolic or a callable that returns a new duration from the current @@ -392,13 +393,14 @@ def pad_to(self, to_new_duration: Union[ExpressionLike, Callable[[Expression], E >>> padded_4 = my_pt.pad_to(to_next_multiple(1, 16)) Args: to_new_duration: Duration or callable that maps the current duration to the new duration + as_single_wf: if the pt is intended to be a single waveform padded to adhere to hardware constraints pt_kwargs: Keyword arguments for the newly created sequence pulse template. Returns: A pulse template that has the duration given by ``to_new_duration``. It can be ``self`` if the duration is already as required. It is never ``self`` if ``pt_kwargs`` is non-empty. """ - from qupulse.pulses import ConstantPT, SequencePT + from qupulse.pulses import SingleWFTimeExtensionPT, TimeExtensionPT current_duration = self.duration if callable(to_new_duration): new_duration = to_new_duration(current_duration) @@ -407,11 +409,9 @@ def pad_to(self, to_new_duration: Union[ExpressionLike, Callable[[Expression], E pad_duration = new_duration - current_duration if not pt_kwargs and pad_duration == 0: return self - pad_pt = ConstantPT(pad_duration, self.final_values) - if pt_kwargs: - return SequencePT(self, pad_pt, **pt_kwargs) - else: - return self @ pad_pt + if as_single_wf: + return SingleWFTimeExtensionPT(self,0.,pad_duration,**pt_kwargs if pt_kwargs is not None else {}) + return TimeExtensionPT(self,0.,pad_duration,**pt_kwargs if pt_kwargs is not None else {}) def __format__(self, format_spec: str): if format_spec == '': diff --git a/qupulse/pulses/sequence_pulse_template.py b/qupulse/pulses/sequence_pulse_template.py index 5107bb10..56939f66 100644 --- a/qupulse/pulses/sequence_pulse_template.py +++ b/qupulse/pulses/sequence_pulse_template.py @@ -40,7 +40,8 @@ def __init__(self, identifier: Optional[str]=None, parameter_constraints: Optional[Iterable[ConstraintLike]]=None, measurements: Optional[List[MeasurementDeclaration]]=None, - registry: PulseRegistryType=None) -> None: + registry: PulseRegistryType=None, + allow_subtemplate_concatenation: bool = True) -> None: """Create a new SequencePulseTemplate instance. Requires a (correctly ordered) list of subtemplates in the form @@ -75,6 +76,7 @@ def __init__(self, + f' defined {defined_channels} vs. subtemplate {subtemplate.defined_channels}') self._register(registry=registry) + self._allow_subtemplate_concatenation = allow_subtemplate_concatenation @classmethod def concatenate(cls, *pulse_templates: Union[PulseTemplate, MappingTuple], **kwargs) -> 'SequencePulseTemplate': @@ -92,7 +94,8 @@ def concatenate(cls, *pulse_templates: Union[PulseTemplate, MappingTuple], **kwa if (isinstance(pt, SequencePulseTemplate) and pt.identifier is None and not pt.measurement_declarations - and not pt.parameter_constraints): + and not pt.parameter_constraints + and pt._allow_subtemplate_concatenation): parsed.extend(pt.subtemplates) else: parsed.append(pt) diff --git a/qupulse/pulses/time_manipulation_pulse_template.py b/qupulse/pulses/time_manipulation_pulse_template.py new file mode 100644 index 00000000..b28632e7 --- /dev/null +++ b/qupulse/pulses/time_manipulation_pulse_template.py @@ -0,0 +1,99 @@ +import numbers +from typing import Dict, Optional, Set, Union, List, Iterable + +from qupulse import ChannelID +from qupulse._program.transformation import Transformation +from qupulse.parameter_scope import Scope +from qupulse.pulses.pulse_template import PulseTemplate +from qupulse.pulses import ConstantPT, SequencePT +from qupulse.expressions import ExpressionLike, ExpressionScalar +from qupulse._program.waveforms import ConstantWaveform +from qupulse.program import ProgramBuilder +from qupulse.pulses.parameters import ConstraintLike +from qupulse.pulses.measurement import MeasurementDeclaration +from qupulse.serialization import PulseRegistryType + + +def _evaluate_expression_dict(expression_dict: Dict[str, ExpressionScalar], scope: Scope) -> Dict[str, float]: + return {ch: value.evaluate_in_scope(scope) + for ch, value in expression_dict.items()} + + +class TimeExtensionPulseTemplate(SequencePT): + """Extend the given pulse template with a constant(?) prefix and/or suffix. + Both start and stop are defined as positive quantities. + """ + + @property + def parameter_names(self) -> Set[str]: + return self._extend_inner.parameter_names | set(self._extend_stop.variables) | set(self._extend_start.variables) + + def __init__(self, inner: PulseTemplate, start: ExpressionLike, stop: ExpressionLike, + *, + parameter_constraints: Optional[Iterable[ConstraintLike]]=None, + measurements: Optional[List[MeasurementDeclaration]]=None, + identifier: Optional[str] = None, + registry: PulseRegistryType = None): + + self._extend_inner = inner + self._extend_start = ExpressionScalar(start) + self._extend_stop = ExpressionScalar(stop) + + id_base = identifier if identifier is not None else "" + + start_pt = ConstantPT(self._extend_start,self._extend_inner.initial_values,identifier=id_base+f"__prepend_{id(self)}") + stop_pt = ConstantPT(self._extend_stop,self._extend_inner.final_values,identifier=id_base+f"__postpend_{id(self)}") + + super().__init__(start_pt,self._extend_inner,stop_pt,identifier=identifier, + parameter_constraints=parameter_constraints, + measurements=measurements, + registry=registry) + + +class SingleWFTimeExtensionPulseTemplate(SequencePT): + """Extend the given pulse template with a constant(?) prefix and/or suffix. + Both start and stop are defined as positive quantities. + """ + + @property + def parameter_names(self) -> Set[str]: + return self._extend_inner.parameter_names | set(self._extend_stop.variables) | set(self._extend_start.variables) + + def _create_program(self, *, + scope: Scope, + measurement_mapping: Dict[str, Optional[str]], + channel_mapping: Dict[ChannelID, Optional[ChannelID]], + global_transformation: Optional[Transformation], + to_single_waveform: Set[Union[str, 'PulseTemplate']], + program_builder: ProgramBuilder): + + super()._create_program(scope=scope, + measurement_mapping=measurement_mapping, + channel_mapping=channel_mapping, + global_transformation=global_transformation, + to_single_waveform=to_single_waveform | {self}, + program_builder=program_builder) + + def __init__(self, inner: PulseTemplate, start: ExpressionLike, stop: ExpressionLike, + *, + parameter_constraints: Optional[Iterable[ConstraintLike]]=None, + measurements: Optional[List[MeasurementDeclaration]]=None, + identifier: Optional[str] = None, + registry: PulseRegistryType = None): + + self._extend_inner = inner + self._extend_start = ExpressionScalar(start) + self._extend_stop = ExpressionScalar(stop) + + id_base = identifier if identifier is not None else "" + + start_pt = ConstantPT(self._extend_start,self._extend_inner.initial_values,identifier=id_base+f"__prepend_{id(self)}") + stop_pt = ConstantPT(self._extend_stop,self._extend_inner.final_values,identifier=id_base+f"__postpend_{id(self)}") + + super().__init__(start_pt,self._extend_inner,stop_pt,identifier=identifier, + parameter_constraints=parameter_constraints, + measurements=measurements, + registry=registry, + allow_subtemplate_concatenation=False) + + diff --git a/qupulse/utils/__init__.py b/qupulse/utils/__init__.py index 326072f4..9dd614a1 100644 --- a/qupulse/utils/__init__.py +++ b/qupulse/utils/__init__.py @@ -26,7 +26,7 @@ __all__ = ["checked_int_cast", "is_integer", "isclose", "pairwise", "replace_multiple", "cached_property", - "forced_hash", "to_next_multiple"] + "forced_hash", "to_next_multiple", "next_multiple_of"] def checked_int_cast(x: Union[float, int, numpy.ndarray], epsilon: float=1e-6) -> int: @@ -149,4 +149,9 @@ def to_next_multiple(sample_rate: ExpressionLike, quantum: int, else: #still return 0 if duration==0 return lambda duration: ExpressionScalar(f'{quantum}/({sample_rate})*Max({min_quanta},-(-{duration}*{sample_rate}//{quantum}))*Max(0, sign({duration}))') - \ No newline at end of file + + +def next_multiple_of(duration: ExpressionLike, sample_rate: ExpressionLike, quantum: int, + min_quanta: Optional[int] = None) -> ExpressionScalar: + """thin wrapper around to_next_multiple to directly call the function""" + return to_next_multiple(sample_rate,quantum,min_quanta)(ExpressionScalar(duration)) \ No newline at end of file diff --git a/tests/pulses/pulse_template_tests.py b/tests/pulses/pulse_template_tests.py index ad82dba9..3f5e4c4a 100644 --- a/tests/pulses/pulse_template_tests.py +++ b/tests/pulses/pulse_template_tests.py @@ -36,6 +36,7 @@ def __init__(self, identifier=None, parameter_names=None, measurement_names=None, final_values=None, + initial_values=None, registry=None): super().__init__(identifier=identifier) @@ -44,6 +45,7 @@ def __init__(self, identifier=None, self._parameter_names = parameter_names self._measurement_names = set() if measurement_names is None else measurement_names self._final_values = final_values + self._initial_values = initial_values self.internal_create_program_args = [] self._register(registry=registry) @@ -93,7 +95,10 @@ def integral(self) -> Dict[ChannelID, ExpressionScalar]: @property def initial_values(self) -> Dict[ChannelID, ExpressionScalar]: - raise NotImplementedError() + if self._initial_values is None: + raise NotImplementedError() + else: + return self._initial_values @property def final_values(self) -> Dict[ChannelID, ExpressionScalar]: @@ -372,72 +377,79 @@ def test_create_program_volatile(self): _internal_create_program.assert_called_once_with(**expected_internal_kwargs, program_builder=program_builder) def test_pad_to(self): - from qupulse.pulses import SequencePT - + from qupulse.pulses import TimeExtensionPT, SingleWFTimeExtensionPT + def to_multiple_of_192(x: Expression) -> Expression: return (x + 191) // 192 * 192 - + final_values = frozendict.frozendict({'A': ExpressionScalar(0.1), 'B': ExpressionScalar('a')}) + initial_values = frozendict.frozendict({'A': ExpressionScalar(-0.1), 'B': ExpressionScalar('a')}) measurements = [('M', 0, 'y')] - + pt = PulseTemplateStub(duration=ExpressionScalar(10)) padded = pt.pad_to(10) self.assertIs(pt, padded) - + pt = PulseTemplateStub(duration=ExpressionScalar('duration')) padded = pt.pad_to('duration') self.assertIs(pt, padded) - - # padding with numeric durations - - pt = PulseTemplateStub(duration=ExpressionScalar(10), - final_values=final_values, - defined_channels=final_values.keys()) - padded = pt.pad_to(20) - self.assertEqual(padded.duration, 20) - self.assertEqual(padded.final_values, final_values) - self.assertIsInstance(padded, SequencePT) - self.assertIs(padded.subtemplates[0], pt) - - padded = pt.pad_to(20, pt_kwargs=dict(measurements=measurements)) - self.assertEqual(padded.duration, 20) - self.assertEqual(padded.final_values, final_values) - self.assertIsInstance(padded, SequencePT) - self.assertIs(padded.subtemplates[0], pt) - self.assertEqual(measurements, padded.measurement_declarations) - - padded = pt.pad_to(10, pt_kwargs=dict(measurements=measurements)) - self.assertEqual(padded.duration, 10) - self.assertEqual(padded.final_values, final_values) - self.assertIsInstance(padded, SequencePT) - self.assertIs(padded.subtemplates[0], pt) - self.assertEqual(measurements, padded.measurement_declarations) - - # padding with numeric duation and callable - padded = pt.pad_to(to_multiple_of_192) - self.assertEqual(padded.duration, 192) - self.assertEqual(padded.final_values, final_values) - self.assertIsInstance(padded, SequencePT) - self.assertIs(padded.subtemplates[0], pt) - - # padding with symbolic durations - - pt = PulseTemplateStub(duration=ExpressionScalar('duration'), - final_values=final_values, - defined_channels=final_values.keys()) - padded = pt.pad_to('new_duration') - self.assertEqual(padded.duration, 'new_duration') - self.assertEqual(padded.final_values, final_values) - self.assertIsInstance(padded, SequencePT) - self.assertIs(padded.subtemplates[0], pt) - - # padding symbolic durations with callable - - padded = pt.pad_to(to_multiple_of_192) - self.assertEqual(padded.duration, '(duration + 191) // 192 * 192') - self.assertEqual(padded.final_values, final_values) - self.assertIsInstance(padded, SequencePT) - self.assertIs(padded.subtemplates[0], pt) + + + for swf_bool, class_type in zip([False,True],[TimeExtensionPT, SingleWFTimeExtensionPT]): + + # padding with numeric durations + + pt = PulseTemplateStub(duration=ExpressionScalar(10), + final_values=final_values, + initial_values=initial_values, + defined_channels=final_values.keys()) + + padded = pt.pad_to(20,as_single_wf=swf_bool) + self.assertEqual(padded.duration, 20) + self.assertEqual(padded.final_values, final_values) + self.assertIsInstance(padded, class_type) + self.assertIs(padded.subtemplates[1], pt) + + padded = pt.pad_to(20,as_single_wf=swf_bool, pt_kwargs=dict(measurements=measurements)) + self.assertEqual(padded.duration, 20) + self.assertEqual(padded.final_values, final_values) + self.assertIsInstance(padded, class_type) + self.assertIs(padded.subtemplates[1], pt) + self.assertEqual(measurements, padded.measurement_declarations) + + padded = pt.pad_to(10,as_single_wf=swf_bool, pt_kwargs=dict(measurements=measurements)) + self.assertEqual(padded.duration, 10) + self.assertEqual(padded.final_values, final_values) + self.assertIsInstance(padded, class_type) + self.assertIs(padded.subtemplates[1], pt) + self.assertEqual(measurements, padded.measurement_declarations) + + # padding with numeric duation and callable + padded = pt.pad_to(to_multiple_of_192,as_single_wf=swf_bool) + self.assertEqual(padded.duration, 192) + self.assertEqual(padded.final_values, final_values) + self.assertIsInstance(padded, class_type) + self.assertIs(padded.subtemplates[1], pt) + + # padding with symbolic durations + + pt = PulseTemplateStub(duration=ExpressionScalar('duration'), + final_values=final_values, + initial_values=initial_values, + defined_channels=final_values.keys()) + padded = pt.pad_to('new_duration',as_single_wf=swf_bool) + self.assertEqual(padded.duration, 'new_duration') + self.assertEqual(padded.final_values, final_values) + self.assertIsInstance(padded, class_type) + self.assertIs(padded.subtemplates[1], pt) + + # padding symbolic durations with callable + + padded = pt.pad_to(to_multiple_of_192,as_single_wf=swf_bool) + self.assertEqual(padded.duration, '(duration + 191) // 192 * 192') + self.assertEqual(padded.final_values, final_values) + self.assertIsInstance(padded, class_type) + self.assertIs(padded.subtemplates[1], pt) @mock.patch('qupulse.pulses.pulse_template.default_program_builder') def test_create_program_none(self, pb_mock) -> None: diff --git a/tests/pulses/time_extension_pulse_tests.py b/tests/pulses/time_extension_pulse_tests.py new file mode 100644 index 00000000..22702445 --- /dev/null +++ b/tests/pulses/time_extension_pulse_tests.py @@ -0,0 +1,83 @@ +import unittest + +from qupulse.pulses import FunctionPT, TimeExtensionPT, SingleWFTimeExtensionPT +from qupulse.program.loop import LoopBuilder +from qupulse.program.linspace import LinSpaceBuilder +from qupulse.utils import to_next_multiple, next_multiple_of +from qupulse.utils.types import TimeType + +class TimeExtensionPulseTemplateTests(unittest.TestCase): + + def setUp(self): + self.main_pt = FunctionPT("tanh(a*t**2 + b*t + c) * sin(b*t + c) + cos(a*t)/2",192*1e1,channel="a") + + self.extend_prior = SingleWFTimeExtensionPT(self.main_pt,"t_prior","t_z") + self.extend_posterior = SingleWFTimeExtensionPT(self.main_pt,"t_z","t_posterior") + self.extend_both = SingleWFTimeExtensionPT(self.main_pt,"t_prior","t_posterior") + + self.parameters = dict(t_prior=256.,t_posterior=512.,t_z=0., + a=1.,b=0.5,c=1. + ) + + self.sequenced_pt = self.extend_prior @ self.extend_posterior @ self.extend_both + + self.sequenced_extended_pt = TimeExtensionPT(self.sequenced_pt,"t_prior","t_posterior") + + def test_loopbuilder(self): + self.extend_both.create_program(program_builder=LoopBuilder(), + parameters=self.parameters) + + self.sequenced_pt.create_program(program_builder=LoopBuilder(), + parameters=self.parameters) + + self.sequenced_extended_pt.create_program(program_builder=LoopBuilder(), + parameters=self.parameters) + + def test_linspacebuilder(self): + self.extend_both.create_program(program_builder=LinSpaceBuilder(('a',)), + parameters=self.parameters) + + self.sequenced_pt.create_program(program_builder=LinSpaceBuilder(('a',)), + parameters=self.parameters) + + self.sequenced_extended_pt.create_program(program_builder=LinSpaceBuilder(('a',)), + parameters=self.parameters) + + def test_quantities(self): + + self.assertEqual(self.extend_both.defined_channels, self.main_pt.defined_channels) + + + duration_extended = self.extend_prior.duration.evaluate_in_scope(self.parameters) + duration_summed = self.main_pt.duration.evaluate_in_scope(self.parameters)\ + + self.parameters['t_prior'] + self.assertEqual(duration_extended, duration_summed) + + duration_extended = self.extend_posterior.duration.evaluate_in_scope(self.parameters) + duration_summed = self.main_pt.duration.evaluate_in_scope(self.parameters)\ + + self.parameters['t_posterior'] + self.assertEqual(duration_extended, duration_summed) + + duration_extended = self.extend_both.duration.evaluate_in_scope(self.parameters) + duration_summed = self.main_pt.duration.evaluate_in_scope(self.parameters)\ + + self.parameters['t_prior'] + self.parameters['t_posterior'] + self.assertEqual(duration_extended, duration_summed) + + def test_pad_to_usecase(self): + + main_pt = FunctionPT("tanh(a*t**2 + b*t + c) * sin(b*t + c) + cos(a*t)/2","t_main",channel="a") + + extended_pt = TimeExtensionPT(main_pt.pad_to(to_next_multiple("sample_rate",16,4)), + start=0., + stop=next_multiple_of("t_posterior","sample_rate",16) + ) + + parameters = dict(t_main=13.8437652,t_posterior=654., + a=1.,b=0.5,c=1., + sample_rate=TimeType(12,5), + ) + + prog = extended_pt.create_program(program_builder=LoopBuilder(), + parameters=parameters) + + self.assertEqual(prog.duration, TimeType(2060, 3)) \ No newline at end of file