diff --git a/.git-blame-ignore-revs b/.git-blame-ignore-revs
new file mode 100644
index 0000000..1756b21
--- /dev/null
+++ b/.git-blame-ignore-revs
@@ -0,0 +1,2 @@
+63d994fa6294d87eead1f8502d1476eca3a84148 # apply new pre-commit rules
+b20a1b955f654202f2014a29ab834ec1f3915e35 # reorganize workflows
diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml
index fedde03..d8cfad7 100644
--- a/.pre-commit-config.yaml
+++ b/.pre-commit-config.yaml
@@ -1,27 +1,17 @@
repos:
- repo: https://github.com/psf/black
- rev: 22.12.0
+ rev: 23.3.0
hooks:
- id: black
language_version: python3 # Should be a command that runs python3.6+
- - repo: https://github.com/pycqa/flake8
- rev: '6.0.0'
+ - repo: https://github.com/astral-sh/ruff-pre-commit
+ rev: v0.0.270
hooks:
- - id: flake8
- args: [--count, --show-source, --statistics, --version]
- additional_dependencies:
- - flake8-bugbear
- - flake8-builtins
- - flake8-comprehensions
+ - id: ruff
+ args: [--show-source]
- repo: https://github.com/kynan/nbstripout
rev: 0.6.1
hooks:
- id: nbstripout
-
- - repo: https://github.com/asottile/pyupgrade
- rev: v3.3.0
- hooks:
- - id: pyupgrade
- args: [--py39-plus]
diff --git a/aiidalab_ispg/app/atmospec_steps.py b/aiidalab_ispg/app/atmospec_steps.py
index 0eeb25b..396752f 100644
--- a/aiidalab_ispg/app/atmospec_steps.py
+++ b/aiidalab_ispg/app/atmospec_steps.py
@@ -8,11 +8,10 @@
import traitlets
from aiida.engine import submit, ProcessState
-from aiida.orm import Bool, StructureData, TrajectoryData, WorkChainNode
+from aiida.orm import Bool
from aiida.orm import load_code, load_node
from aiida.plugins import WorkflowFactory
-from aiidalab_widgets_base import WizardAppWidgetStep
from .input_widgets import (
ExcitedStateMethod,
@@ -335,9 +334,8 @@ def submit(self, _=None):
nroots=bp.nstates,
)
else:
- raise NotImplementedError(
- f"Excited method {bp.excited_method} not implemented"
- )
+ msg = f"Excited method {bp.excited_method} not implemented"
+ raise NotImplementedError(msg)
builder.optimize = bp.optimize
builder.opt.orca.parameters = gs_opt_parameters
@@ -454,7 +452,6 @@ def _observe_status(self, change):
class ViewAtmospecAppWorkChainStatusAndResultsStep(ViewWorkChainStatusStep):
-
workflow_status = traitlets.Instance(AtmospecWorkflowStatus, allow_none=True)
def __init__(self, **kwargs):
diff --git a/aiidalab_ispg/app/conformers.py b/aiidalab_ispg/app/conformers.py
index 4233bd7..289c44d 100644
--- a/aiidalab_ispg/app/conformers.py
+++ b/aiidalab_ispg/app/conformers.py
@@ -69,7 +69,6 @@ class FFMethod(Enum):
class ConformerSmilesWidget(SmilesWidget):
-
structure = Union(
[Instance(Atoms), Instance(StructureData), Instance(TrajectoryData)],
allow_none=True,
@@ -232,7 +231,10 @@ def _filter_and_sort_conformers(self, conformers, energies):
selected_energies.append(shifted_energy)
return selected_conformers, selected_energies
- def _rdkit_opt(self, smiles, steps, algo=RDKitMethod.ETKDGV1, opt_algo=None):
+ # TODO: Refactor this to smaller functions
+ def _rdkit_opt( # noqa: C901
+ self, smiles, steps, algo=RDKitMethod.ETKDGV1, opt_algo=None
+ ):
"""Optimize a molecule using force field and rdkit (needed for complex SMILES)."""
if self.debug:
@@ -273,7 +275,8 @@ def _rdkit_opt(self, smiles, steps, algo=RDKitMethod.ETKDGV1, opt_algo=None):
mol, numConfs=num_confs, params=params
)
if len(conf_ids) == 0:
- raise ValueError("Failed to generate conformers with RDKit")
+ msg = "Failed to generate conformers with RDKit"
+ raise ValueError(msg)
ffenergies = None
if opt_algo == FFMethod.UFF and AllChem.UFFHasAllMoleculeParams(mol):
diff --git a/aiidalab_ispg/app/experimental_spectra/import_experimental_spectrum.py b/aiidalab_ispg/app/experimental_spectra/import_experimental_spectrum.py
index 7c02d70..2f4e3ea 100644
--- a/aiidalab_ispg/app/experimental_spectra/import_experimental_spectrum.py
+++ b/aiidalab_ispg/app/experimental_spectra/import_experimental_spectrum.py
@@ -1,3 +1,4 @@
+# ruff: noqa: INP001
# This script needs to be run with `verdi run`
import argparse
from pprint import pprint
@@ -7,6 +8,9 @@
import numpy as np
from rdkit import Chem
+from aiida.orm import QueryBuilder
+from aiida.plugins import DataFactory
+
XyData = DataFactory("array.xy")
@@ -33,15 +37,16 @@ def parse_cmd():
def canonicalize_smiles(smiles):
mol = Chem.MolFromSmiles(smiles, sanitize=True)
if mol is None:
- raise ValueError("RDkit ERROR: Invalid SMILES string")
+ msg = "RDkit ERROR: Invalid SMILES string"
+ raise ValueError(msg)
canonical_smiles = Chem.MolToSmiles(mol, isomericSmiles=True, canonical=True)
if not canonical_smiles:
- raise ValueError("RDKit: Could not canonicalize SMILES")
+ msg = "RDKit: Could not canonicalize SMILES"
+ raise ValueError(msg)
return canonical_smiles
def main(input_file, dry_run=True):
-
with open(opts.input_file) as f:
data = yaml.safe_load(f)
diff --git a/aiidalab_ispg/app/input_widgets.py b/aiidalab_ispg/app/input_widgets.py
index 15bca30..f6f2209 100644
--- a/aiidalab_ispg/app/input_widgets.py
+++ b/aiidalab_ispg/app/input_widgets.py
@@ -42,7 +42,6 @@ class ExcitedStateMethod(Enum):
class MolecularGeometrySettings(ipw.VBox):
-
title = ipw.HTML(
"""
Molecular geometry
"""
@@ -234,14 +233,13 @@ def _observe_excited_method(self, change):
self.tddft_functional.disabled = False
def reset(self):
- self.excited_method.value = _DEFAULT_EXCITED_METHOD
+ self.excited_method.value = self._DEFAULT_EXCITED_METHOD
self.method.value = self._DEFAULT_FUNCTIONAL
self.basis.value = self._DEFAULT_BASIS
self.nstate.value = self._DEFAULT_NSTATES
class WignerSamplingSettings(ipw.VBox):
-
disabled = traitlets.Bool(default=False)
title = ipw.HTML(
@@ -296,7 +294,6 @@ def reset(self):
class CodeSettings(ipw.VBox):
-
codes_title = ipw.HTML(
"""
Codes
"""
@@ -311,7 +308,6 @@ class CodeSettings(ipw.VBox):
_DEFAULT_ORCA_CODES = ("orca@slurm", "orca@localhost")
def __init__(self, **kwargs):
-
self.orca = ComputationalResourcesWidget(
default_calc_job_plugin="orca.orca",
description="ORCA program",
diff --git a/aiidalab_ispg/app/optimization_steps.py b/aiidalab_ispg/app/optimization_steps.py
index 91c3e9c..8b9951f 100644
--- a/aiidalab_ispg/app/optimization_steps.py
+++ b/aiidalab_ispg/app/optimization_steps.py
@@ -8,16 +8,10 @@
import traitlets
from aiida.engine import submit, ProcessState
-from aiida.orm import Bool, StructureData, TrajectoryData, WorkChainNode
+from aiida.orm import Bool
from aiida.orm import load_code, load_node
from aiida.plugins import WorkflowFactory
-from aiidalab_widgets_base import (
- WizardAppWidgetStep,
- AiidaNodeViewWidget,
- ProcessMonitor,
- ProcessNodesTreeWidget,
-)
from .input_widgets import (
ResourceSelectionWidget,
@@ -27,7 +21,6 @@
)
from .widgets import TrajectoryDataViewer, spinner
from .steps import SubmitWorkChainStepBase, ViewWorkChainStatusStep
-from .utils import MEMORY_PER_CPU
ConformerOptimizationWorkChain = WorkflowFactory("ispg.conformer_opt")
diff --git a/aiidalab_ispg/app/qeapp/process.py b/aiidalab_ispg/app/qeapp/process.py
index cf71611..8dd5a0f 100644
--- a/aiidalab_ispg/app/qeapp/process.py
+++ b/aiidalab_ispg/app/qeapp/process.py
@@ -3,7 +3,6 @@
import ipywidgets as ipw
import traitlets as tl
-from aiida import orm
from aiida.tools.query.calculation import CalculationQueryBuilder
diff --git a/aiidalab_ispg/app/qeapp/widgets.py b/aiidalab_ispg/app/qeapp/widgets.py
index e2bd564..70325b6 100644
--- a/aiidalab_ispg/app/qeapp/widgets.py
+++ b/aiidalab_ispg/app/qeapp/widgets.py
@@ -24,7 +24,6 @@
class RollingOutput(ipw.VBox):
-
style = (
"background-color: #253239; color: #cdd3df; line-height: normal; custom=test"
)
@@ -107,7 +106,8 @@ def _default_tooltip(self):
return "Download"
def __on_click(self, _):
- digest = hashlib.md5(self.payload).hexdigest() # bypass browser cache
+ # bypass browser cache
+ digest = hashlib.md5(self.payload).hexdigest() # noqa: S324
payload = base64.b64encode(self.payload).decode()
link_id = f"dl_{digest}"
@@ -134,7 +134,6 @@ def __on_click(self, _):
class FilenameDisplayWidget(ipw.Box):
-
value = traitlets.Unicode()
def __init__(self, max_width=None, **kwargs):
@@ -158,7 +157,6 @@ def _observe_filename(self, change):
class LogOutputWidget(ipw.VBox):
-
filename = traitlets.Unicode()
value = traitlets.Unicode()
@@ -236,7 +234,6 @@ def _observe_value(self, change):
class CalcJobOutputFollower(traitlets.HasTraits):
-
calcjob_uuid = traitlets.Unicode(allow_none=True)
filename = traitlets.Unicode(allow_none=True)
output = traitlets.List(trait=traitlets.Unicode)
diff --git a/aiidalab_ispg/app/spectrum.py b/aiidalab_ispg/app/spectrum.py
index fb55a2e..d75411d 100644
--- a/aiidalab_ispg/app/spectrum.py
+++ b/aiidalab_ispg/app/spectrum.py
@@ -147,7 +147,8 @@ def get_spectrum(
elif kernel is BroadeningKernel.LORENTZ:
self._calc_lorentzian_spectrum(x, y, width)
else:
- raise ValueError(f"Invalid broadening kernel {kernel}")
+ msg = f"Invalid broadening kernel {kernel}"
+ raise ValueError(msg)
# Conversion factor from eV to given energy unit
if x_unit == EnergyUnit.NM:
@@ -170,7 +171,6 @@ def _convert_to_nanometers(self, x, y):
class SpectrumWidget(ipw.VBox):
-
disabled = traitlets.Bool(default=True)
conformer_transitions = traitlets.List(
trait=traitlets.Dict, allow_none=True, default=None
@@ -645,7 +645,8 @@ def _validate_conformers(self, change):
if not all(
self._validate_transitions(c["transitions"]) for c in conformer_transitions
):
- raise ValueError("Invalid conformer transitions")
+ msg = "Invalid conformer transitions"
+ raise ValueError(msg)
return conformer_transitions
@traitlets.validate("conformer_structures")
@@ -659,7 +660,8 @@ def _validate_conformer_structures(self, change):
elif isinstance(structures, StructureData):
return TrajectoryData(structurelist=(structures,))
else:
- raise ValueError(f"Unsupported type {type(structures)}")
+ msg = f"Unsupported type {type(structures)}"
+ raise ValueError(msg)
@traitlets.observe("selected_conformer_id")
def _observe_selected_conformer(self, change):
@@ -704,7 +706,7 @@ def find_experimental_spectrum_by_smiles(self, smiles: str):
and plot it if it is available in our DB"""
self.set_trait("experimental_spectrum_uuid", None)
- if smiles is None or smiles == "":
+ if not smiles:
return
qb = QueryBuilder()
diff --git a/aiidalab_ispg/app/spectrum_analysis.py b/aiidalab_ispg/app/spectrum_analysis.py
index e0d0653..d6cd6c7 100644
--- a/aiidalab_ispg/app/spectrum_analysis.py
+++ b/aiidalab_ispg/app/spectrum_analysis.py
@@ -5,16 +5,13 @@
"""
from dataclasses import dataclass
-from enum import Enum, unique
import bokeh.plotting as plt
import bokeh.palettes
-from bokeh.models import ColumnDataSource, Scatter
import ipywidgets as ipw
import traitlets
import scipy
-from scipy import constants
import numpy as np
from .utils import BokehFigureContext
@@ -105,8 +102,8 @@ def _init_figure(self, *args, **kwargs) -> BokehFigureContext:
"""Initialize Bokeh figure. Arguments are passed to bokeh.plt.figure()"""
figure = BokehFigureContext(plt.figure(*args, **kwargs))
f = figure.get_figure()
- f.xaxis.axis_label = f"Excitation Energy (eV)"
- f.yaxis.axis_label = f"Oscillator strength (-)"
+ f.xaxis.axis_label = "Excitation Energy (eV)"
+ f.yaxis.axis_label = "Oscillator strength (-)"
return figure
@traitlets.observe("conformer_transitions")
@@ -130,7 +127,8 @@ def _update_density_plot(self, plot_type: str):
elif plot_type == "DENSITY":
self.plot_density(energies, osc_strengths)
else:
- raise ValueError(f"Unexpected value for toggle: {plot_type}")
+ msg = f"Unexpected value for toggle: {plot_type}"
+ raise ValueError(msg)
def _flatten_transitions(self):
# Flatten transitions for all conformers.
diff --git a/aiidalab_ispg/app/steps.py b/aiidalab_ispg/app/steps.py
index 67274ec..7c90c64 100644
--- a/aiidalab_ispg/app/steps.py
+++ b/aiidalab_ispg/app/steps.py
@@ -5,15 +5,12 @@
* Daniel Hollas
"""
import ipywidgets as ipw
-import numpy as np
import re
import traitlets
-from aiida.common import MissingEntryPointError, LinkType
-from aiida.engine import ProcessState, submit
+from aiida.engine import ProcessState
from aiida.orm import load_node
from aiida.orm import WorkChainNode, StructureData, TrajectoryData
-from aiida.plugins import WorkflowFactory
from aiidalab_widgets_base import (
AiidaNodeViewWidget,
@@ -87,7 +84,8 @@ def _on_submit_button_clicked(self, _):
def submit(self):
"""Submit workflow, implementation must be provided by the the child class"""
- raise NotImplementedError("FATAL: submit method not implemented")
+ msg = "FATAL: submit method not implemented"
+ raise NotImplementedError(msg)
def _get_state(self):
# Process is already running.
@@ -212,11 +210,9 @@ def _update_workflow_state(self, _=None):
"""To be implemented by child workflows
to power the workflow-specific progress bar
"""
- pass
def _display_results(self, _=None):
"""Optional function to be called when the process is finished"""
- pass
@traitlets.observe("process_uuid")
def _observe_process(self, change):
diff --git a/aiidalab_ispg/app/utils.py b/aiidalab_ispg/app/utils.py
index f4a5085..f26588d 100644
--- a/aiidalab_ispg/app/utils.py
+++ b/aiidalab_ispg/app/utils.py
@@ -26,6 +26,7 @@
# https://github.com/pzarabadip/aiida-orca/issues/45
MEMORY_PER_CPU = 3000 # Mb
+
# TODO: Use numpy here? Measure the speed...
# Energies expected in kJ / mole, Absolute temperature in Kelvins
def calc_boltzmann_weights(energies, T):
@@ -48,7 +49,8 @@ def get_formula(data_node):
elif isinstance(data_node, CifData):
return data_node.get_ase().get_chemical_formula()
else:
- raise ValueError(f"Cannot get formula from node {type(data_node)}")
+ msg = f"Cannot get formula from node {type(data_node)}"
+ raise ValueError(msg)
# https://stackoverflow.com/a/3382369
diff --git a/aiidalab_ispg/app/widgets.py b/aiidalab_ispg/app/widgets.py
index c190afa..26232f5 100644
--- a/aiidalab_ispg/app/widgets.py
+++ b/aiidalab_ispg/app/widgets.py
@@ -6,18 +6,15 @@
"""
import base64
-from enum import Enum, unique
import io
import ipywidgets as ipw
import traitlets
import nglview
-from dataclasses import dataclass
import ase
from ase import Atoms
-from aiida.tools.query.calculation import CalculationQueryBuilder
from aiida.orm import load_node, Node, Data
from aiida.plugins import DataFactory
@@ -56,7 +53,6 @@ def parse_extra_info(self, pk: int) -> dict:
@register_viewer_widget("data.core.array.trajectory.TrajectoryData.")
class TrajectoryDataViewer(StructureDataViewer):
-
# TODO: Do not subclass StructureDataViewer, but have it as a component
trajectory = traitlets.Instance(Node, allow_none=True)
selected_structure_id = traitlets.Int(allow_none=True)
@@ -66,7 +62,6 @@ class TrajectoryDataViewer(StructureDataViewer):
_boltzmann_weights = None
def __init__(self, trajectory=None, configuration_tabs=None, **kwargs):
-
if configuration_tabs is None:
configuration_tabs = ["Selection", "Download"]
@@ -223,7 +218,6 @@ def __init__(
node_class=None,
**kwargs,
):
-
# History of modifications
self.history = []
@@ -268,11 +262,8 @@ def __init__(
elif node_class in self.SUPPORTED_DATA_FORMATS:
self.node_class = node_class
else:
- raise ValueError(
- "Unknown data format '{}'. Options: {}".format(
- node_class, list(self.SUPPORTED_DATA_FORMATS.keys())
- )
- )
+ msg = f"Unknown data format '{node_class}'. Options: {list(self.SUPPORTED_DATA_FORMATS.keys())}"
+ raise ValueError(msg)
self.output = ipw.HTML("")
@@ -280,12 +271,15 @@ def __init__(
self._structure_importers(importers),
self.viewer,
ipw.HBox(
- store_and_description
- + [self.structure_label, self.structure_description]
+ [
+ *store_and_description,
+ self.structure_label,
+ self.structure_description,
+ ]
),
]
- super(ipw.VBox, self).__init__(children=children + [self.output], **kwargs)
+ super(ipw.VBox, self).__init__(children=[*children, self.output], **kwargs)
def _convert_to_structure_node(self, structure):
"""Convert structure of any type to the StructureNode object."""
@@ -324,7 +318,8 @@ def _convert_to_structure_node(self, structure):
structurelist=(StructureData(ase=structure.get_ase()),)
)
else:
- raise ValueError(f"Unexpected node type {type(structure)}")
+ msg = f"Unexpected node type {type(structure)}"
+ raise ValueError(msg)
# Using self.structure, as it was already converted to the ASE Atoms object.
return structure_node_type(ase=self.structure)
@@ -430,14 +425,13 @@ def __init__(self, dismissible=False, *args, **kwargs):
def show(self, message):
"""Show the warning."""
+ dismiss = ""
+ alert_classes = "alert alert-danger"
if self.dismissible:
- alert_classes = "alert alert-danger alert-dismissible"
+ alert_classes = f"{alert_classes} alert-dismissible"
dismiss = """×"""
- else:
- alert_classes = "alert alert-danger"
- dismiss = ""
self.value = (
- f"""{dismiss}{message}
"""
+ f"""{dismiss}{message}
"""
)
self.layout.display = "block"
diff --git a/aiidalab_ispg/wigner/__init__.py b/aiidalab_ispg/wigner/__init__.py
index 310ee65..952ac76 100644
--- a/aiidalab_ispg/wigner/__init__.py
+++ b/aiidalab_ispg/wigner/__init__.py
@@ -1 +1,5 @@
from .wigner import Wigner
+
+__all__ = [
+ "Wigner",
+]
diff --git a/aiidalab_ispg/workflows/__init__.py b/aiidalab_ispg/workflows/__init__.py
index f0fefb7..d837524 100644
--- a/aiidalab_ispg/workflows/__init__.py
+++ b/aiidalab_ispg/workflows/__init__.py
@@ -1,400 +1,10 @@
-"""Base work chain to run an ORCA calculation"""
-
-from aiida.engine import WorkChain, calcfunction, ExitCode
-from aiida.engine import append_, ToContext, if_
-
-# not sure if this is needed? Can we use self.run()?
-from aiida.engine import run
-from aiida.plugins import CalculationFactory, WorkflowFactory, DataFactory
-from aiida.orm import to_aiida_type
-from aiida.orm import (
- StructureData,
- TrajectoryData,
- SinglefileData,
- Int,
- Float,
- Bool,
- List,
- Dict,
-)
-
-from aiidalab_ispg.wigner import Wigner
-from .optimization import (
- extract_trajectory_arrays,
- RobustOptimizationWorkChain,
- structures_to_trajectory,
-)
-
-Code = DataFactory("core.code.installed")
-OrcaCalculation = CalculationFactory("orca.orca")
-OrcaBaseWorkChain = WorkflowFactory("orca.base")
-
-
-# Meta WorkChain for combining all inputs from a dynamic namespace into List.
-# Used to combine outputs from several subworkflows into one output.
-# It should be launched via run() instead of submit()
-# NOTE: The code has special handling for Dict nodes,
-# which otherwise fail with not being serializable,
-# so we need the get the value with Dict.get_dict() first.
-# We should check whether this is still needed in aiida-2.0
-# Note we cannot make this more general since List and Dict
-# don't have the .value attribute.
-# https://github.com/aiidateam/aiida-core/issues/5313
-class ConcatInputsToList(WorkChain):
- @classmethod
- def define(cls, spec):
- super().define(spec)
- spec.input_namespace("ns", dynamic=True)
- spec.output("output", valid_type=List)
- spec.outline(cls.combine)
-
- def combine(self):
- input_list = [
- self.inputs.ns[k].get_dict()
- if isinstance(self.inputs.ns[k], Dict)
- else self.inputs.ns[k]
- for k in self.inputs.ns
- ]
- self.out("output", List(list=input_list).store())
-
-
-@calcfunction
-def pick_wigner_structure(wigner_structures, index):
- return wigner_structures.get_step_structure(index.value)
-
-
-@calcfunction
-def add_orca_wf_guess(orca_params: Dict) -> Dict:
- params = orca_params.get_dict()
- params["input_keywords"].append("MOREAD")
- params["input_blocks"]["scf"]["moinp"] = '"aiida_old.gbw"'
- return Dict(params)
-
-
-@calcfunction
-def generate_wigner_structures(
- minimum_structure, orca_output_dict, nsample, low_freq_thr
-):
- seed = orca_output_dict.extras["_aiida_hash"]
- ase_molecule = minimum_structure.get_ase()
- frequencies = orca_output_dict["vibfreqs"]
- normal_modes = orca_output_dict["vibdisps"]
-
- wigner = Wigner(
- ase_molecule,
- frequencies,
- normal_modes,
- seed=seed,
- low_freq_thr=low_freq_thr.value,
- )
-
- wigner_list = [
- StructureData(ase=wigner.get_ase_sample()) for i in range(nsample.value)
- ]
- return TrajectoryData(structurelist=wigner_list)
-
-
-class OrcaWignerSpectrumWorkChain(WorkChain):
- """Top level workchain for Nuclear Ensemble Approach UV/vis
- spectrum for a single conformer"""
-
- def _build_process_label(self):
- return "NEA spectrum workflow"
-
- @classmethod
- def define(cls, spec):
- super().define(spec)
- spec.expose_inputs(
- RobustOptimizationWorkChain,
- namespace="opt",
- exclude=["orca.structure", "orca.code"],
- )
- spec.expose_inputs(
- OrcaBaseWorkChain, namespace="exc", exclude=["orca.structure", "orca.code"]
- )
- spec.input("structure", valid_type=(StructureData, TrajectoryData))
- spec.input("code", valid_type=Code)
-
- # Whether to perform geometry optimization
- spec.input(
- "optimize",
- valid_type=Bool,
- default=lambda: Bool(True),
- serializer=to_aiida_type,
- )
-
- # Number of Wigner geometries (computed only when optimize==True)
- spec.input(
- "nwigner", valid_type=Int, default=lambda: Int(1), serializer=to_aiida_type
- )
-
- spec.input(
- "wigner_low_freq_thr",
- valid_type=Float,
- default=lambda: Float(10),
- serializer=to_aiida_type,
- )
-
- spec.output("relaxed_structure", valid_type=StructureData, required=False)
- spec.output(
- "single_point_excitations",
- valid_type=Dict,
- required=True,
- help="Output parameters from a single-point excitations",
- )
- spec.expose_outputs(
- RobustOptimizationWorkChain,
- namespace="opt",
- include=["output_parameters"],
- namespace_options={"required": False},
- )
-
- spec.output(
- "wigner_excitations",
- valid_type=List,
- required=False,
- help="Output parameters from all Wigner excited state calculation",
- )
-
- spec.outline(
- if_(cls.should_optimize)(
- cls.optimize,
- cls.inspect_optimization,
- ),
- cls.excite,
- cls.inspect_excitation,
- if_(cls.should_run_wigner)(
- cls.wigner_sampling,
- cls.wigner_excite,
- cls.inspect_wigner_excitation,
- ),
- cls.results,
- )
-
- spec.exit_code(
- 401,
- "ERROR_OPTIMIZATION_FAILED",
- "optimization encountered unspecified error",
- )
- spec.exit_code(
- 402, "ERROR_EXCITATION_FAILED", "excited state calculation failed"
- )
-
- def excite(self):
- """Calculate excited states for a single geometry"""
- inputs = self.exposed_inputs(
- OrcaBaseWorkChain, namespace="exc", agglomerate=False
- )
- inputs.orca.code = self.inputs.code
-
- if self.inputs.optimize:
- self.report("Calculating spectrum for optimized geometry")
- inputs.orca.structure = self.ctx.calc_opt.outputs.relaxed_structure
-
- # Pass in converged SCF wavefunction
- with self.ctx.calc_opt.outputs.retrieved.base.repository.open(
- "aiida.gbw", "rb"
- ) as handler:
- gbw_file = SinglefileData(handler)
- inputs.orca.file = {"gbw": gbw_file}
- inputs.orca.parameters = add_orca_wf_guess(inputs.orca.parameters)
- else:
- self.report("Calculating spectrum for input geometry")
- inputs.orca.structure = self.inputs.structure
-
- calc_exc = self.submit(OrcaBaseWorkChain, **inputs)
- calc_exc.label = "single-point-excitation"
- return ToContext(calc_exc=calc_exc)
-
- def wigner_sampling(self):
- self.report(f"Generating {self.inputs.nwigner.value} Wigner geometries")
-
- n_low_freq_vibs = 0
- for freq in self.ctx.calc_opt.outputs.output_parameters["vibfreqs"]:
- if freq < self.inputs.wigner_low_freq_thr:
- n_low_freq_vibs += 1
- if n_low_freq_vibs > 0:
- self.report(
- f"Ignoring {n_low_freq_vibs} vibrations below {self.inputs.wigner_low_freq_thr.value} cm^-1"
- )
-
- self.ctx.wigner_structures = generate_wigner_structures(
- self.ctx.calc_opt.outputs.relaxed_structure,
- self.ctx.calc_opt.outputs.output_parameters,
- self.inputs.nwigner,
- self.inputs.wigner_low_freq_thr,
- )
-
- def wigner_excite(self):
- inputs = self.exposed_inputs(
- OrcaBaseWorkChain, namespace="exc", agglomerate=False
- )
- inputs.orca.code = self.inputs.code
- # Pass in SCF wavefunction from minimum geometry
- with self.ctx.calc_opt.outputs.retrieved.base.repository.open(
- "aiida.gbw", "rb"
- ) as handler:
- gbw_file = SinglefileData(handler)
- inputs.orca.file = {"gbw": gbw_file}
- inputs.orca.parameters = add_orca_wf_guess(inputs.orca.parameters)
- for i in self.ctx.wigner_structures.get_stepids():
- inputs.orca.structure = pick_wigner_structure(
- self.ctx.wigner_structures, Int(i)
- )
- calc = self.submit(OrcaBaseWorkChain, **inputs)
- calc.label = "wigner-excitation"
- self.to_context(wigner_calcs=append_(calc))
-
- def optimize(self):
- """Optimize geometry"""
- inputs = self.exposed_inputs(
- RobustOptimizationWorkChain, namespace="opt", agglomerate=False
- )
- inputs.orca.structure = self.inputs.structure
- inputs.orca.code = self.inputs.code
-
- calc_opt = self.submit(RobustOptimizationWorkChain, **inputs)
- calc_opt.label = "optimization"
- return ToContext(calc_opt=calc_opt)
-
- def inspect_optimization(self):
- """Check whether optimization succeeded"""
- if not self.ctx.calc_opt.is_finished_ok:
- self.report("Optimization failed :-(")
- return self.exit_codes.ERROR_OPTIMIZATION_FAILED
- self.out_many(
- self.exposed_outputs(
- self.ctx.calc_opt,
- RobustOptimizationWorkChain,
- namespace="opt",
- agglomerate=False,
- )
- )
-
- def extract_transitions_from_orca_output(self, orca_output_params):
- return {
- "oscillator_strengths": orca_output_params["etoscs"],
- # Orca returns excited state energies in cm^-1
- # Perhaps we should do the conversion here,
- # to make this less ORCA specific.
- "excitation_energies_cm": orca_output_params["etenergies"],
- }
-
- def inspect_excitation(self):
- """Check whether excitation succeeded"""
- if not self.ctx.calc_exc.is_finished_ok:
- self.report("Single point excitation failed :-(")
- return self.exit_codes.ERROR_EXCITATION_FAILED
-
- transitions = self.extract_transitions_from_orca_output(
- self.ctx.calc_exc.outputs.output_parameters
- )
- self.out("single_point_excitations", Dict(transitions).store())
-
- def inspect_wigner_excitation(self):
- """Check whether all wigner excitations succeeded"""
- for calc in self.ctx.wigner_calcs:
- if not calc.is_finished_ok:
- self.report("Wigner excitation failed :-(")
- return self.exit_codes.ERROR_EXCITATION_FAILED
-
- def should_optimize(self):
- return self.inputs.optimize.value
-
- def should_run_wigner(self):
- return self.should_optimize() and self.inputs.nwigner > 0
-
- def results(self):
- """Expose results from child workchains"""
-
- if self.should_optimize():
- self.out("relaxed_structure", self.ctx.calc_opt.outputs.relaxed_structure)
-
- if self.should_run_wigner():
- all_wigner_data = [
- self.extract_transitions_from_orca_output(wc.outputs.output_parameters)
- for wc in self.ctx.wigner_calcs
- ]
- self.out("wigner_excitations", List(all_wigner_data).store())
-
-
-class AtmospecWorkChain(WorkChain):
- """The top-level ATMOSPEC workchain"""
-
- def _build_process_label(self):
- return "ATMOSPEC workflow"
-
- @classmethod
- def define(cls, spec):
- super().define(spec)
- spec.expose_inputs(OrcaWignerSpectrumWorkChain, exclude=["structure"])
- spec.input("structure", valid_type=TrajectoryData)
-
- spec.output(
- "spectrum_data",
- valid_type=List,
- required=True,
- help="All data necessary to construct spectrum in SpectrumWidget",
- )
-
- spec.output(
- "relaxed_structures",
- valid_type=TrajectoryData,
- required=False,
- help="Minimized structures of all conformers",
- )
-
- spec.outline(
- cls.launch,
- cls.collect,
- )
-
- def launch(self):
- inputs = self.exposed_inputs(OrcaWignerSpectrumWorkChain, agglomerate=False)
- self.report(
- f"Launching ATMOSPEC for {len(self.inputs.structure.get_stepids())} conformers"
- )
- for conf_id in self.inputs.structure.get_stepids():
- inputs.structure = self.inputs.structure.get_step_structure(conf_id)
- workflow = self.submit(OrcaWignerSpectrumWorkChain, **inputs)
- workflow.label = f"atmospec-conf-{conf_id}"
- self.to_context(confs=append_(workflow))
-
- def collect(self):
- for wc in self.ctx.confs:
- if not wc.is_finished_ok:
- return ExitCode(wc.exit_status, wc.exit_message)
-
- conf_outputs = [wc.outputs for wc in self.ctx.confs]
-
- # Combine all spectra data
- if self.inputs.optimize and self.inputs.nwigner > 0:
- data = {
- str(i): outputs.wigner_excitations
- for i, outputs in enumerate(conf_outputs)
- }
- else:
- data = {
- str(i): [outputs.single_point_excitations.get_dict()]
- for i, outputs in enumerate(conf_outputs)
- }
- all_results = run(ConcatInputsToList, ns=data)
- self.out("spectrum_data", all_results["output"])
-
- # Combine all optimized geometries into single TrajectoryData
- if self.inputs.optimize:
- relaxed_structures = {}
- orca_output_params = {}
- for i, outputs in enumerate(conf_outputs):
- relaxed_structures[f"struct_{i}"] = outputs.relaxed_structure
- orca_output_params[f"params_{i}"] = outputs.opt.output_parameters
-
- # For multiple conformers, we're appending relative energies and Boltzmann weights
- array_data = None
- if len(self.ctx.confs) > 1:
- array_data = extract_trajectory_arrays(**orca_output_params)
-
- trajectory = structures_to_trajectory(
- arrays=array_data, **relaxed_structures
- )
- self.out("relaxed_structures", trajectory)
+"""AiiDA workflows for ISPG AiiDAlab applications"""
+from .atmospec import AtmospecWorkChain
+from .harmonic_wigner import generate_wigner_structures
+from .optimization import ConformerOptimizationWorkChain
+
+__all__ = [
+ "AtmospecWorkChain",
+ "generate_wigner_structures",
+ "ConformerOptimizationWorkChain",
+]
diff --git a/aiidalab_ispg/workflows/atmospec.py b/aiidalab_ispg/workflows/atmospec.py
new file mode 100644
index 0000000..d2173fb
--- /dev/null
+++ b/aiidalab_ispg/workflows/atmospec.py
@@ -0,0 +1,339 @@
+"""Base work chain to run an ORCA calculation"""
+
+from aiida.engine import WorkChain, ExitCode
+from aiida.engine import append_, ToContext, if_
+
+# Not sure if this is needed? Can we use self.run()?
+from aiida.engine import run
+from aiida.plugins import CalculationFactory, WorkflowFactory, DataFactory
+from aiida.orm import to_aiida_type
+from aiida.orm import (
+ StructureData,
+ TrajectoryData,
+ SinglefileData,
+ Int,
+ Float,
+ Bool,
+ List,
+ Dict,
+)
+
+from .harmonic_wigner import generate_wigner_structures
+from .optimization import RobustOptimizationWorkChain
+from .utils import (
+ add_orca_wf_guess,
+ ConcatInputsToList,
+ extract_trajectory_arrays,
+ pick_structure_from_trajectory,
+ structures_to_trajectory,
+)
+
+Code = DataFactory("core.code.installed")
+OrcaCalculation = CalculationFactory("orca.orca")
+OrcaBaseWorkChain = WorkflowFactory("orca.base")
+
+
+class OrcaWignerSpectrumWorkChain(WorkChain):
+ """Top level workchain for Nuclear Ensemble Approach UV/vis
+ spectrum for a single conformer"""
+
+ def _build_process_label(self):
+ return "NEA spectrum workflow"
+
+ @classmethod
+ def define(cls, spec):
+ super().define(spec)
+ spec.expose_inputs(
+ RobustOptimizationWorkChain,
+ namespace="opt",
+ exclude=["orca.structure", "orca.code"],
+ )
+ spec.expose_inputs(
+ OrcaBaseWorkChain, namespace="exc", exclude=["orca.structure", "orca.code"]
+ )
+ spec.input("structure", valid_type=(StructureData, TrajectoryData))
+ spec.input("code", valid_type=Code)
+
+ # Whether to perform geometry optimization
+ spec.input(
+ "optimize",
+ valid_type=Bool,
+ default=lambda: Bool(True),
+ serializer=to_aiida_type,
+ )
+
+ # Number of Wigner geometries (computed only when optimize==True)
+ spec.input(
+ "nwigner", valid_type=Int, default=lambda: Int(1), serializer=to_aiida_type
+ )
+
+ spec.input(
+ "wigner_low_freq_thr",
+ valid_type=Float,
+ default=lambda: Float(10),
+ serializer=to_aiida_type,
+ )
+
+ spec.output("relaxed_structure", valid_type=StructureData, required=False)
+ spec.output(
+ "single_point_excitations",
+ valid_type=Dict,
+ required=True,
+ help="Output parameters from a single-point excitations",
+ )
+ spec.expose_outputs(
+ RobustOptimizationWorkChain,
+ namespace="opt",
+ include=["output_parameters"],
+ namespace_options={"required": False},
+ )
+
+ spec.output(
+ "wigner_excitations",
+ valid_type=List,
+ required=False,
+ help="Output parameters from all Wigner excited state calculation",
+ )
+
+ spec.outline(
+ if_(cls.should_optimize)(
+ cls.optimize,
+ cls.inspect_optimization,
+ ),
+ cls.excite,
+ cls.inspect_excitation,
+ if_(cls.should_run_wigner)(
+ cls.wigner_sampling,
+ cls.wigner_excite,
+ cls.inspect_wigner_excitation,
+ ),
+ cls.results,
+ )
+
+ spec.exit_code(
+ 401,
+ "ERROR_OPTIMIZATION_FAILED",
+ "optimization encountered unspecified error",
+ )
+ spec.exit_code(
+ 402, "ERROR_EXCITATION_FAILED", "excited state calculation failed"
+ )
+
+ def excite(self):
+ """Calculate excited states for a single geometry"""
+ inputs = self.exposed_inputs(
+ OrcaBaseWorkChain, namespace="exc", agglomerate=False
+ )
+ inputs.orca.code = self.inputs.code
+
+ if self.inputs.optimize:
+ self.report("Calculating spectrum for optimized geometry")
+ inputs.orca.structure = self.ctx.calc_opt.outputs.relaxed_structure
+
+ # Pass in converged SCF wavefunction
+ with self.ctx.calc_opt.outputs.retrieved.base.repository.open(
+ "aiida.gbw", "rb"
+ ) as handler:
+ gbw_file = SinglefileData(handler)
+ inputs.orca.file = {"gbw": gbw_file}
+ inputs.orca.parameters = add_orca_wf_guess(inputs.orca.parameters)
+ else:
+ self.report("Calculating spectrum for input geometry")
+ inputs.orca.structure = self.inputs.structure
+
+ calc_exc = self.submit(OrcaBaseWorkChain, **inputs)
+ calc_exc.label = "single-point-excitation"
+ return ToContext(calc_exc=calc_exc)
+
+ def wigner_sampling(self):
+ self.report(f"Generating {self.inputs.nwigner.value} Wigner geometries")
+
+ n_low_freq_vibs = 0
+ for freq in self.ctx.calc_opt.outputs.output_parameters["vibfreqs"]:
+ if freq < self.inputs.wigner_low_freq_thr:
+ n_low_freq_vibs += 1
+ if n_low_freq_vibs > 0:
+ self.report(
+ f"Ignoring {n_low_freq_vibs} vibrations below {self.inputs.wigner_low_freq_thr.value} cm^-1"
+ )
+
+ self.ctx.wigner_structures = generate_wigner_structures(
+ self.ctx.calc_opt.outputs.relaxed_structure,
+ self.ctx.calc_opt.outputs.output_parameters,
+ self.inputs.nwigner,
+ self.inputs.wigner_low_freq_thr,
+ )
+
+ def wigner_excite(self):
+ inputs = self.exposed_inputs(
+ OrcaBaseWorkChain, namespace="exc", agglomerate=False
+ )
+ inputs.orca.code = self.inputs.code
+ # Pass in SCF wavefunction from minimum geometry
+ with self.ctx.calc_opt.outputs.retrieved.base.repository.open(
+ "aiida.gbw", "rb"
+ ) as handler:
+ gbw_file = SinglefileData(handler)
+ inputs.orca.file = {"gbw": gbw_file}
+ inputs.orca.parameters = add_orca_wf_guess(inputs.orca.parameters)
+ for i in self.ctx.wigner_structures.get_stepids():
+ inputs.orca.structure = pick_structure_from_trajectory(
+ self.ctx.wigner_structures, Int(i)
+ )
+ calc = self.submit(OrcaBaseWorkChain, **inputs)
+ calc.label = "wigner-excitation"
+ self.to_context(wigner_calcs=append_(calc))
+
+ def optimize(self):
+ """Optimize geometry"""
+ inputs = self.exposed_inputs(
+ RobustOptimizationWorkChain, namespace="opt", agglomerate=False
+ )
+ inputs.orca.structure = self.inputs.structure
+ inputs.orca.code = self.inputs.code
+
+ calc_opt = self.submit(RobustOptimizationWorkChain, **inputs)
+ calc_opt.label = "optimization"
+ return ToContext(calc_opt=calc_opt)
+
+ def inspect_optimization(self):
+ """Check whether optimization succeeded"""
+ if not self.ctx.calc_opt.is_finished_ok:
+ self.report("Optimization failed :-(")
+ return self.exit_codes.ERROR_OPTIMIZATION_FAILED
+ self.out_many(
+ self.exposed_outputs(
+ self.ctx.calc_opt,
+ RobustOptimizationWorkChain,
+ namespace="opt",
+ agglomerate=False,
+ )
+ )
+
+ def extract_transitions_from_orca_output(self, orca_output_params):
+ return {
+ "oscillator_strengths": orca_output_params["etoscs"],
+ # Orca returns excited state energies in cm^-1
+ # Perhaps we should do the conversion here,
+ # to make this less ORCA specific.
+ "excitation_energies_cm": orca_output_params["etenergies"],
+ }
+
+ def inspect_excitation(self):
+ """Check whether excitation succeeded"""
+ if not self.ctx.calc_exc.is_finished_ok:
+ self.report("Single point excitation failed :-(")
+ return self.exit_codes.ERROR_EXCITATION_FAILED
+
+ transitions = self.extract_transitions_from_orca_output(
+ self.ctx.calc_exc.outputs.output_parameters
+ )
+ self.out("single_point_excitations", Dict(transitions).store())
+
+ def inspect_wigner_excitation(self):
+ """Check whether all wigner excitations succeeded"""
+ for calc in self.ctx.wigner_calcs:
+ if not calc.is_finished_ok:
+ self.report("Wigner excitation failed :-(")
+ return self.exit_codes.ERROR_EXCITATION_FAILED
+
+ def should_optimize(self):
+ return self.inputs.optimize.value
+
+ def should_run_wigner(self):
+ return self.should_optimize() and self.inputs.nwigner > 0
+
+ def results(self):
+ """Expose results from child workchains"""
+
+ if self.should_optimize():
+ self.out("relaxed_structure", self.ctx.calc_opt.outputs.relaxed_structure)
+
+ if self.should_run_wigner():
+ all_wigner_data = [
+ self.extract_transitions_from_orca_output(wc.outputs.output_parameters)
+ for wc in self.ctx.wigner_calcs
+ ]
+ self.out("wigner_excitations", List(all_wigner_data).store())
+
+
+class AtmospecWorkChain(WorkChain):
+ """The top-level ATMOSPEC workchain"""
+
+ def _build_process_label(self):
+ return "ATMOSPEC workflow"
+
+ @classmethod
+ def define(cls, spec):
+ super().define(spec)
+ spec.expose_inputs(OrcaWignerSpectrumWorkChain, exclude=["structure"])
+ spec.input("structure", valid_type=TrajectoryData)
+
+ spec.output(
+ "spectrum_data",
+ valid_type=List,
+ required=True,
+ help="All data necessary to construct spectrum in SpectrumWidget",
+ )
+
+ spec.output(
+ "relaxed_structures",
+ valid_type=TrajectoryData,
+ required=False,
+ help="Minimized structures of all conformers",
+ )
+
+ spec.outline(
+ cls.launch,
+ cls.collect,
+ )
+
+ def launch(self):
+ inputs = self.exposed_inputs(OrcaWignerSpectrumWorkChain, agglomerate=False)
+ self.report(
+ f"Launching ATMOSPEC for {len(self.inputs.structure.get_stepids())} conformers"
+ )
+ for conf_id in self.inputs.structure.get_stepids():
+ inputs.structure = self.inputs.structure.get_step_structure(conf_id)
+ workflow = self.submit(OrcaWignerSpectrumWorkChain, **inputs)
+ workflow.label = f"atmospec-conf-{conf_id}"
+ self.to_context(confs=append_(workflow))
+
+ def collect(self):
+ for wc in self.ctx.confs:
+ if not wc.is_finished_ok:
+ return ExitCode(wc.exit_status, wc.exit_message)
+
+ conf_outputs = [wc.outputs for wc in self.ctx.confs]
+
+ # Combine all spectra data
+ if self.inputs.optimize and self.inputs.nwigner > 0:
+ data = {
+ str(i): outputs.wigner_excitations
+ for i, outputs in enumerate(conf_outputs)
+ }
+ else:
+ data = {
+ str(i): [outputs.single_point_excitations.get_dict()]
+ for i, outputs in enumerate(conf_outputs)
+ }
+ all_results = run(ConcatInputsToList, ns=data)
+ self.out("spectrum_data", all_results["output"])
+
+ # Combine all optimized geometries into single TrajectoryData
+ if self.inputs.optimize:
+ relaxed_structures = {}
+ orca_output_params = {}
+ for i, outputs in enumerate(conf_outputs):
+ relaxed_structures[f"struct_{i}"] = outputs.relaxed_structure
+ orca_output_params[f"params_{i}"] = outputs.opt.output_parameters
+
+ # For multiple conformers, we're appending relative energies and Boltzmann weights
+ array_data = None
+ if len(self.ctx.confs) > 1:
+ array_data = extract_trajectory_arrays(**orca_output_params)
+
+ trajectory = structures_to_trajectory(
+ arrays=array_data, **relaxed_structures
+ )
+ self.out("relaxed_structures", trajectory)
diff --git a/aiidalab_ispg/workflows/harmonic_wigner.py b/aiidalab_ispg/workflows/harmonic_wigner.py
new file mode 100644
index 0000000..866c230
--- /dev/null
+++ b/aiidalab_ispg/workflows/harmonic_wigner.py
@@ -0,0 +1,35 @@
+"""AiiDA workflow for generating harmonic wigner sampling"""
+
+from aiida.engine import calcfunction
+from aiida.orm import StructureData, TrajectoryData
+from aiidalab_ispg.wigner import Wigner
+
+__all__ = [
+ "generate_wigner_structures",
+]
+
+
+@calcfunction
+def generate_wigner_structures(
+ minimum_structure: StructureData,
+ orca_output_dict: dict,
+ nsample: int,
+ low_freq_thr: float,
+):
+ seed = orca_output_dict.extras["_aiida_hash"]
+ ase_molecule = minimum_structure.get_ase()
+ frequencies = orca_output_dict["vibfreqs"]
+ normal_modes = orca_output_dict["vibdisps"]
+
+ wigner = Wigner(
+ ase_molecule,
+ frequencies,
+ normal_modes,
+ seed=seed,
+ low_freq_thr=low_freq_thr.value,
+ )
+
+ wigner_list = [
+ StructureData(ase=wigner.get_ase_sample()) for i in range(nsample.value)
+ ]
+ return TrajectoryData(structurelist=wigner_list)
diff --git a/aiidalab_ispg/workflows/optimization.py b/aiidalab_ispg/workflows/optimization.py
index ccb6534..267f726 100644
--- a/aiidalab_ispg/workflows/optimization.py
+++ b/aiidalab_ispg/workflows/optimization.py
@@ -1,9 +1,6 @@
-# AiiDA workflows dealing with optimization of molecules.
+"""AiiDA workflows for optimization of molecules."""
-import math
-import numpy as np
-
-from aiida.engine import WorkChain, calcfunction
+from aiida.engine import WorkChain
from aiida.engine import (
append_,
ExitCode,
@@ -12,84 +9,18 @@
)
from aiida.plugins import WorkflowFactory, DataFactory
+from .utils import structures_to_trajectory, extract_trajectory_arrays
+
StructureData = DataFactory("core.structure")
TrajectoryData = DataFactory("core.array.trajectory")
-Array = DataFactory("core.array")
Code = DataFactory("core.code.installed")
OrcaBaseWorkChain = WorkflowFactory("orca.base")
-AUtoEV = 27.2114386245
-AUtoKCAL = 627.04
-KCALtoKJ = 4.183
-AUtoKJ = AUtoKCAL * KCALtoKJ
-EVtoKJ = AUtoKCAL * KCALtoKJ / AUtoEV
-
-# TODO: Switch to variadic arguments (supported since AiiDA 2.3)
-@calcfunction
-def structures_to_trajectory(arrays: Array = None, **structures) -> TrajectoryData:
- """Concatenate a list of StructureData to TrajectoryData
-
- Optionally, set additional data as Arrays.
- """
- traj = TrajectoryData(list(structures.values()))
- if arrays is None:
- return traj
-
- for name in arrays.get_arraynames():
- traj.set_array(name, arrays.get_array(name))
- # Copy over extras as well, except for private ones like '_aiida_hash'
- extras = arrays.base.extras.all
- for key in list(extras.keys()):
- if key.startswith("_"):
- del extras[key]
- traj.base.extras.set_many(extras)
- return traj
-
-
-def calc_boltzmann_weights(energies: list, T: float):
- """Compute Boltzmann weights for a list of energies.
-
- param energies: list of energies / kJ per mole
- param T: temperature / Kelvin
- returns: Boltzmann weights as numpy array
- """
- # Molar gas constant, Avogadro times Boltzmann
- R = 8.3144598
- RT = R * T
- E0 = min(energies)
- weights = [math.exp(-(1000 * (E - E0)) / RT) for E in energies]
- Q = sum(weights)
- return np.array([weight / Q for weight in weights])
-
-
-@calcfunction
-def extract_trajectory_arrays(**orca_output_parameters) -> Array:
- """Extract Gibbs energies and other useful stuff from the list
- of ORCA output parameter dictionaries.
-
- Return Array node, which will be appended to TrajectoryData node.
- """
- gibbs_energies = np.array(
- [params["freeenergy"] for params in orca_output_parameters.values()]
- )
- en0 = min(gibbs_energies)
- relative_gibbs_energies_kj = AUtoKJ * (gibbs_energies - en0)
-
- temperature = list(orca_output_parameters.values())[0]["temperature"]
-
- boltzmann_weights = calc_boltzmann_weights(relative_gibbs_energies_kj, temperature)
-
- en = Array()
- en.set_array("gibbs_energies_au", gibbs_energies)
- en.set_array("relative_gibbs_energies_kj", relative_gibbs_energies_kj)
- en.set_array("boltzmann_weights", boltzmann_weights)
- en.set_extra("temperature", temperature)
-
- # For the TrajectoryData viewer compatibility
- en.set_array("energies", relative_gibbs_energies_kj)
- en.set_extra("energy_units", "kJ/mole")
- return en
+__all__ = [
+ "RobustOptimizationWorkChain",
+ "ConformerOptimizationWorkChain",
+]
# TODO: For now this is just a plain optimization,
@@ -120,7 +51,7 @@ def define(cls, spec):
def handle_imaginary_frequencies(self, calculation):
"""Check successfull optimization for imaginary frequencies."""
frequencies = calculation.outputs.output_parameters["vibfreqs"]
- vibrational_displacements = calculation.outputs.output_parameters["vibdisps"]
+ # vibdisp = calculation.outputs.output_parameters["vibdisps"]
n_imag_freq = len(list(filter(lambda x: x <= 0, frequencies)))
# TODO: Check that nfreq is 3N-6 or 3N-5!
if n_imag_freq > 0:
@@ -129,7 +60,9 @@ def handle_imaginary_frequencies(self, calculation):
)
self.report(f"All frequencies (cm^-1): {frequencies}")
# TODO: Displace optimized geometry along the imaginary normal modes.
- # self.ctx.inputs.orca.structure = self.distort_structure(self.ctx.outputs.relaxed_structure, frequencies, vibrational_displacements)
+ # self.ctx.inputs.orca.structure =
+ # self.distort_structure(self.ctx.outputs.relaxed_structure,
+ # frequencies, vibdisp)
# Note: By default there are maximum 5 restarts in the BaseRestartWorkChain, which seems reasonable
# return ProcessHandlerReport(do_break=True)
return ProcessHandlerReport(
@@ -140,7 +73,7 @@ def handle_imaginary_frequencies(self, calculation):
class ConformerOptimizationWorkChain(WorkChain):
"""Top-level workchain for optimization of molecules in Orca.
- Essentially, this is a thin wrapper workchain around RobustOptimizationWorkChain
+ Essentially, this is a "thin" wrapper workchain around RobustOptimizationWorkChain
to support optimization of multiple conformers in parallel.
"""
@@ -151,7 +84,6 @@ def _build_process_label(self) -> str:
def define(cls, spec):
super().define(spec)
spec.expose_inputs(RobustOptimizationWorkChain, exclude=["orca.structure"])
- # TODO: Rename to trajectory?
spec.input("structure", valid_type=TrajectoryData)
spec.output(
@@ -167,9 +99,6 @@ def define(cls, spec):
cls.collect_optimized_conformers,
)
- # Very generic error now
- spec.exit_code(300, "CONFORMER_ERROR", "Conformer optimization failed")
-
def launch_conformer_optimization(self):
inputs = self.exposed_inputs(RobustOptimizationWorkChain, agglomerate=False)
nconf = len(self.inputs.structure.get_stepids())
@@ -182,10 +111,9 @@ def launch_conformer_optimization(self):
def inspect_conformer_optimization(self):
"""Check whether all optimizations succeeded"""
- # TODO: Specialize errors. Can we expose errors from child workflows?
for wc in self.ctx.confs:
if not wc.is_finished_ok:
- return self.exit_codes.CONFORMER_ERROR
+ return ExitCode(wc.exit_status, wc.exit_message)
def collect_optimized_conformers(self):
"""Combine all optimized geometries into single TrajectoryData"""
diff --git a/aiidalab_ispg/workflows/utils.py b/aiidalab_ispg/workflows/utils.py
new file mode 100644
index 0000000..451a6fd
--- /dev/null
+++ b/aiidalab_ispg/workflows/utils.py
@@ -0,0 +1,134 @@
+"""Small utility workflows and functions"""
+import math
+import numpy as np
+
+from aiida.engine import WorkChain, calcfunction
+from aiida.orm import (
+ ArrayData,
+ Dict,
+ List,
+ StructureData,
+ TrajectoryData,
+)
+
+__all__ = [
+ "add_orca_wf_guess",
+ "ConcatInputsToList",
+ "pick_structure_from_trajectory",
+ "structures_to_trajectory",
+ "extract_trajectory_arrays",
+]
+
+AUtoEV = 27.2114386245
+AUtoKCAL = 627.04
+KCALtoKJ = 4.183
+AUtoKJ = AUtoKCAL * KCALtoKJ
+EVtoKJ = AUtoKCAL * KCALtoKJ / AUtoEV
+
+
+# Meta WorkChain for combining all inputs from a dynamic namespace into List.
+# Used to combine outputs from several subworkflows into one output.
+# It should be launched via run() instead of submit()
+# NOTE: The code has special handling for Dict nodes,
+# which otherwise fail with not being serializable,
+# so we need the get the value with Dict.get_dict() first.
+# We should check whether this is still needed in aiida-2.0
+# Note we cannot make this more general since List and Dict
+# don't have the .value attribute.
+# https://github.com/aiidateam/aiida-core/issues/5313
+class ConcatInputsToList(WorkChain):
+ @classmethod
+ def define(cls, spec):
+ super().define(spec)
+ spec.input_namespace("ns", dynamic=True)
+ spec.output("output", valid_type=List)
+ spec.outline(cls.combine)
+
+ def combine(self):
+ input_list = [
+ self.inputs.ns[k].get_dict()
+ if isinstance(self.inputs.ns[k], Dict)
+ else self.inputs.ns[k]
+ for k in self.inputs.ns
+ ]
+ self.out("output", List(list=input_list).store())
+
+
+@calcfunction
+def pick_structure_from_trajectory(trajectory: TrajectoryData, index) -> StructureData:
+ return trajectory.get_step_structure(index.value)
+
+
+@calcfunction
+def add_orca_wf_guess(orca_params: Dict) -> Dict:
+ params = orca_params.get_dict()
+ params["input_keywords"].append("MOREAD")
+ params["input_blocks"]["scf"]["moinp"] = '"aiida_old.gbw"'
+ return Dict(params)
+
+
+# TODO: Switch to variadic arguments (supported since AiiDA 2.3)
+@calcfunction
+def structures_to_trajectory(arrays: ArrayData = None, **structures) -> TrajectoryData:
+ """Concatenate a list of StructureData to TrajectoryData
+
+ Optionally, set additional data as Arrays.
+ """
+ traj = TrajectoryData(list(structures.values()))
+ if arrays is None:
+ return traj
+
+ for name in arrays.get_arraynames():
+ traj.set_array(name, arrays.get_array(name))
+ # Copy over extras as well, except for private ones like '_aiida_hash'
+ extras = arrays.base.extras.all
+ for key in list(extras.keys()):
+ if key.startswith("_"):
+ del extras[key]
+ traj.base.extras.set_many(extras)
+ return traj
+
+
+def calc_boltzmann_weights(energies: list, T: float):
+ """Compute Boltzmann weights for a list of energies.
+
+ param energies: list of energies / kJ per mole
+ param T: temperature / Kelvin
+ returns: Boltzmann weights as numpy array
+ """
+ # Molar gas constant, Avogadro times Boltzmann
+ R = 8.3144598
+ RT = R * T
+ E0 = min(energies)
+ weights = [math.exp(-(1000 * (E - E0)) / RT) for E in energies]
+ Q = sum(weights)
+ return np.array([weight / Q for weight in weights])
+
+
+@calcfunction
+def extract_trajectory_arrays(**orca_output_parameters) -> ArrayData:
+ """Extract Gibbs energies and other useful stuff from the list
+ of ORCA output parameter dictionaries.
+
+ Return ArrayData node, which will be appended to TrajectoryData node.
+ """
+ gibbs_energies = np.array(
+ [params["freeenergy"] for params in orca_output_parameters.values()]
+ )
+ en0 = min(gibbs_energies)
+ relative_gibbs_energies_kj = AUtoKJ * (gibbs_energies - en0)
+
+ temperature = list(orca_output_parameters.values())[0]["temperature"]
+
+ boltzmann_weights = calc_boltzmann_weights(relative_gibbs_energies_kj, temperature)
+
+ en = ArrayData()
+ en.set_array("gibbs_energies_au", gibbs_energies)
+ en.set_array("relative_gibbs_energies_kj", relative_gibbs_energies_kj)
+ en.set_array("boltzmann_weights", boltzmann_weights)
+ en.set_extra("temperature", temperature)
+
+ # For the TrajectoryData viewer compatibility
+ en.set_array("energies", relative_gibbs_energies_kj)
+ en.set_extra("energy_units", "kJ/mole")
+ return en
diff --git a/pyproject.toml b/pyproject.toml
index a6db0ed..80fd2a0 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -7,6 +7,38 @@ requires = [
]
build-backend = "setuptools.build_meta"
+[tool.ruff]
+# Enable pyflakes and pyf-builtins, pyflakes, f=bugbear
+select = [
+ "A", # flake8-builtins
+ "B", # flake8-bugbear
+ "E", # pycodestyle
+ "F", # pyflakes
+ "C90", # McCabe code complexity
+ "UP", # pyupgrade
+ "S", # bandit
+ "C4", # comprehensiosn
+ "EM", # errormsg
+ "ISC", # implicit concatenation
+ "ICN", # import convention
+ "INP", # no implicite namespace package
+ "PIE", #
+ "PT", # pytest style
+ "PTH",
+ # "PL", # pylint, for now disabled
+ # "PLR", # pylint refactor
+ "PLC", "PLE", "PLW",
+ "RUF", # ruff
+]
+line-length = 120
+src = ["aiidalab_ispg", "tests"]
+target-version = "py39"
+
+# Never enforce `E501` (line length violations).
+# TODO: Remove all asserts from the codebase and enable this rule
+# Use of assert detected (S101)
+ignore = ["E501", "S101", "PT004", "PTH123"]
+
[tool.bumpver]
current_version = "0.2.0a3"
version_pattern = "MAJOR.MINOR.PATCH[PYTAGNUM]"
diff --git a/setup.cfg b/setup.cfg
index 89e1060..3ee967a 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -43,17 +43,6 @@ aiidalab_ispg.app.static =
*.jinja
*.css
-[flake8]
-ignore =
- # Line length handled by black.
- E501
- # Line break before binary operator, preferred formatting for black.
- W503
- # Whitespace before ':', preferred formatting for black.
- E203
- N803
- N806
-
[options.entry_points]
aiida.workflows =
ispg.atmospec = aiidalab_ispg.workflows:AtmospecWorkChain
diff --git a/tests/app/conftest.py b/tests/app/conftest.py
index 8104c5a..d1fbca4 100644
--- a/tests/app/conftest.py
+++ b/tests/app/conftest.py
@@ -1,3 +1,4 @@
+# ruff: noqa: INP001
import os
import time
from pathlib import Path
@@ -13,7 +14,7 @@
def is_responsive(url):
try:
- response = requests.get(url)
+ response = requests.get(url, timeout=200)
if response.status_code == 200:
return True
except ConnectionError:
@@ -22,7 +23,7 @@ def is_responsive(url):
@pytest.fixture(scope="session")
def docker_compose_file(pytestconfig):
- return os.path.join(str(pytestconfig.rootdir), "tests/app", "docker-compose.yml")
+ return pytestconfig.rootdir / "tests" / "app" / "docker-compose.yml"
@pytest.fixture(scope="session")
@@ -76,7 +77,7 @@ def notebook_service(docker_ip, docker_services, aiidalab_exec, nb_user, appdir)
return url, token
-@pytest.fixture(scope="function")
+@pytest.fixture()
def selenium_driver(selenium, notebook_service):
def _selenium_driver(nb_path, wait_time=5.0):
url, token = notebook_service
@@ -97,7 +98,7 @@ def _selenium_driver(nb_path, wait_time=5.0):
return _selenium_driver
-@pytest.fixture(scope="function")
+@pytest.fixture()
def generate_mol_from_smiles(selenium):
def _generate_mol(smiles):
smiles_input = selenium.find_element(By.XPATH, "//input[@placeholder='C=C']")
@@ -113,7 +114,7 @@ def _generate_mol(smiles):
return _generate_mol
-@pytest.fixture(scope="function")
+@pytest.fixture()
def check_atoms(selenium):
"""Check that we can select atoms in a molecule given atom symbols."""
@@ -143,7 +144,7 @@ def _select_atoms(atom_symbols: str):
return _select_atoms
-@pytest.fixture
+@pytest.fixture()
def button_enabled(selenium):
def _button_enabled(button_title):
WebDriverWait(selenium, 15).until(
@@ -157,7 +158,7 @@ def _button_enabled(button_title):
return _button_enabled
-@pytest.fixture
+@pytest.fixture()
def button_disabled(selenium):
def _button_disabled(button_title):
WebDriverWait(selenium, 15).until(
@@ -171,15 +172,15 @@ def _button_disabled(button_title):
@pytest.fixture(scope="session")
def screenshot_dir():
- sdir = Path.joinpath(Path.cwd(), "screenshots")
+ sdir = Path.cwd() / "screenshots"
try:
- os.mkdir(sdir)
+ Path.mkdir(sdir)
except FileExistsError:
pass
return sdir
-@pytest.fixture
+@pytest.fixture()
def final_screenshot(request, screenshot_dir, selenium):
"""Take screenshot at the end of the test.
Screenshot name is generated from the test function name
@@ -191,13 +192,13 @@ def final_screenshot(request, screenshot_dir, selenium):
selenium.get_screenshot_as_file(screenshot_path)
-@pytest.fixture
+@pytest.fixture()
def firefox_options(firefox_options):
firefox_options.add_argument("--headless")
return firefox_options
-@pytest.fixture
+@pytest.fixture()
def chrome_options(chrome_options):
chrome_options.add_argument("--headless")
return chrome_options
diff --git a/tests/app/test_app.py b/tests/app/test_app.py
index a489fd4..da0e204 100755
--- a/tests/app/test_app.py
+++ b/tests/app/test_app.py
@@ -1,5 +1,5 @@
+# ruff: noqa: INP001
import requests
-import time
from enum import Enum
from pathlib import Path
@@ -13,6 +13,7 @@
WINDOW_WIDTH = 1400
WINDOW_HEIGHT = 1250
+
# Copied over from aiidalab_widgets_base/wizard.py
class StepState(Enum):
"""Wizzard step state"""
@@ -25,14 +26,14 @@ class StepState(Enum):
FAIL = -1 # the step has unrecoverably failed
-@pytest.fixture
+@pytest.fixture()
def check_step_status(selenium):
ICONS = {
StepState.INIT: "○",
StepState.READY: "◎",
StepState.CONFIGURED: "●",
StepState.SUCCESS: "✓",
- StepState.FAIL: "×",
+ StepState.FAIL: "×", # noqa: RUF001
# The ACTIVE state is "animated", see aiidalab_widgets_base/wizard.py,
# hence we cannot use it in tests.
# WizardAppWidgetStep.State.ACTIVE: ["\u25dc", "\u25dd", "\u25de", "\u25df"],
@@ -47,14 +48,14 @@ def _check_step_status(step_num, expected_state: StepState):
return _check_step_status
-@pytest.mark.tryfirst
+@pytest.mark.tryfirst()
def test_post_install(notebook_service, aiidalab_exec, nb_user, appdir):
aiidalab_exec("./post_install", workdir=appdir, user=nb_user)
def test_notebook_service_available(notebook_service):
url, token = notebook_service
- response = requests.get(f"{url}/?token={token}")
+ response = requests.get(f"{url}/?token={token}", timeout=100)
assert response.status_code == 200
@@ -157,7 +158,7 @@ def test_optimization_steps(
button_disabled("Confirm")
button_disabled("Submit")
- submit = WebDriverWait(driver, 10).until(
+ WebDriverWait(driver, 10).until(
EC.visibility_of_element_located((By.XPATH, "//button[text()='Submit']"))
)
@@ -221,7 +222,7 @@ def test_atmospec_steps(
# Make sure the submit button is visible. It should not be clickable since
# ORCA is not installed.
- submit = WebDriverWait(driver, 10).until(
+ WebDriverWait(driver, 10).until(
EC.visibility_of_element_located((By.XPATH, "//button[text()='Submit']"))
)
button_disabled("Submit")
diff --git a/tests/wigner/tst_wigner.py b/tests/wigner/tst_wigner.py
index 27f3dc9..72004e1 100755
--- a/tests/wigner/tst_wigner.py
+++ b/tests/wigner/tst_wigner.py
@@ -1,7 +1,8 @@
#!/usr/bin/env python3
+# ruff: noqa
import ase
-import aiidalab_atmospec_workchain.wigner as wigner
+from aiidalab_atmospec_workchain.wigner import wigner
from aiidalab_atmospec_workchain.wigner import ANG_TO_BOHR
LOW_FREQ_THR = 200