diff --git a/.gitignore b/.gitignore index 8ab394d..aa6b71f 100644 --- a/.gitignore +++ b/.gitignore @@ -30,3 +30,5 @@ tests/containers/bouncing_ball/bouncing.json tests/containers/arch/hierarchical.fmu tests/containers/arch/flat.fmu tests/containers/arch/reversed.fmu +tests/containers/ssp/bouncing.fmu +tests/containers/ssp/bouncing diff --git a/fmu_manipulation_toolbox/assembly.py b/fmu_manipulation_toolbox/assembly.py index 0708b95..41d1af4 100644 --- a/fmu_manipulation_toolbox/assembly.py +++ b/fmu_manipulation_toolbox/assembly.py @@ -1,13 +1,13 @@ import csv import json import logging -import os from typing import * from pathlib import Path import uuid +import xml.parsers.expat +import zipfile from .fmu_container import FMUContainer, FMUContainerError -from .ssp import SSP logger = logging.getLogger("fmu_manipulation_toolbox") @@ -81,7 +81,7 @@ def make_fmu(self, fmu_directory: Path, debug=False, description_pathname=None): container = FMUContainer(self.name, fmu_directory, description_pathname=description_pathname) - for fmu_name in self.fmu_names_list: + for fmu_name in sorted(self.fmu_names_list): container.get_fmu(fmu_name) for port, source in self.input_ports.items(): @@ -108,7 +108,7 @@ def make_fmu(self, fmu_directory: Path, debug=False, description_pathname=None): for node in self.children: logger.info(f"Deleting transient FMU Container '{node.name}'") - os.remove(fmu_directory / node.name) + (fmu_directory / node.name).unlink() class AssemblyError(Exception): @@ -122,7 +122,7 @@ def __repr__(self): class Assembly: def __init__(self, filename: str, step_size = None, auto_link: bool = True, auto_input: bool = True, auto_output: bool = True, mt: bool = False, profiling: bool = False, fmu_directory: Path = "."): - + self.filename = Path(filename) self.default_auto_input = auto_input self.default_auto_output = auto_output self.default_step_size = step_size @@ -130,13 +130,17 @@ def __init__(self, filename: str, step_size = None, auto_link: bool = True, aut self.default_mt = mt self.default_profiling = profiling self.fmu_directory = fmu_directory - self.filename = Path(filename) + self.transient_filenames: List[Path] = [] if not fmu_directory.is_dir(): raise FMUContainerError(f"FMU directory is not valid: '{fmu_directory}'") self.root = self.read() + def __del__(self): + for filename in self.transient_filenames: + filename.unlink() + def read(self) -> AssemblyNode: logger.info(f"Reading '{self.filename}'") if self.filename.suffix == ".json": @@ -346,9 +350,31 @@ def json_decode_node(self, data) -> AssemblyNode: return node def read_ssp(self) -> AssemblyNode: - logger.info(f"Building FMU Container from '{self.filename}'") logger.warning("This feature is ALPHA stage.") - ssp = SSP(self.fmu_directory, self.filename) + name = str(self.filename.with_suffix(".fmu")) + root = AssemblyNode(name, step_size=self.default_step_size, auto_link=self.default_auto_link, + mt=self.default_mt, profiling=self.default_profiling, auto_input=self.default_auto_input, + auto_output=self.default_auto_output) + def start_element(name, attrs): + if name == 'ssd:Connection': + root.add_link(attrs['startElement'] + '.fmu', attrs['startConnector'], + attrs['endElement'] + '.fmu', attrs['endConnector']) + + with zipfile.ZipFile(self.fmu_directory / self.filename) as zin: + for file in zin.filelist: + target_filename = Path(file.filename).name + if file.filename.endswith(".fmu"): # Extract all FMUs into the fmu_directory + zin.getinfo(file.filename).filename = target_filename + zin.extract(file, path=self.fmu_directory) + logger.debug(f"Extraction {file.filename}") + self.transient_filenames.append(self.fmu_directory / file.filename) + elif file.filename == "SystemStructure.ssd": + logger.debug(f"Analysing {file.filename}") + with zin.open(file) as file_handle: + parser = xml.parsers.expat.ParserCreate() + parser.StartElementHandler = start_element + parser.ParseFile(file_handle) + return root def make_fmu(self, debug=False): self.root.make_fmu(self.fmu_directory, debug=debug, description_pathname=self.fmu_directory / self.filename) diff --git a/fmu_manipulation_toolbox/ssp.py b/fmu_manipulation_toolbox/ssp.py deleted file mode 100644 index c2f0f52..0000000 --- a/fmu_manipulation_toolbox/ssp.py +++ /dev/null @@ -1,52 +0,0 @@ -import logging -import xml.parsers.expat -import zipfile -from pathlib import Path - -logger = logging.getLogger("fmutool") - - -class SSP: - def __init__(self, fmu_directory: Path, ssp_filename: Path): - self.ssp_filename = ssp_filename - self.fmu_directory = fmu_directory - self.analyse_ssp() - - def analyse_ssp(self): - with zipfile.ZipFile(self.fmu_directory / self.ssp_filename) as zin: - for file in zin.filelist: - target_filename = Path(file.filename).name - if file.filename.endswith(".fmu"): # Extract all FMUs into the fmu_directory - zin.getinfo(file.filename).filename = target_filename - zin.extract(file, path=self.fmu_directory) - elif file.filename == "SystemStructure.ssd": - zin.getinfo(file.filename).filename = target_filename - zin.extract(file, path=self.fmu_directory) - with zin.open(file) as file_handle: - self.analyse_ssd(file_handle, target_filename) - - def analyse_ssd(self, file, filename): - logger.info(f"Find {filename}") - parser = xml.parsers.expat.ParserCreate() - parser.StartElementHandler = self.start_element - parser.EndElementHandler = self.end_element - parser.CharacterDataHandler = self.char_data - - parser.ParseFile(file) - - - def start_element(self, name, attrs): - #logger.info(f"{name} {attrs}") - pass - - def end_element(self, name): - pass - - def char_data(self, data): - pass - -class SSD: - class Component: - pass - class Unit: - pass \ No newline at end of file