Skip to content

Commit

Permalink
First working version of SSP read routine
Browse files Browse the repository at this point in the history
  • Loading branch information
Nicolas Laurent committed Dec 17, 2024
1 parent ac73d92 commit 850f1ba
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 60 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,5 @@ tests/containers/bouncing_ball/bouncing.json
tests/containers/arch/hierarchical.fmu
tests/containers/arch/flat.fmu
tests/containers/arch/reversed.fmu
tests/containers/ssp/bouncing.fmu
tests/containers/ssp/bouncing
42 changes: 34 additions & 8 deletions fmu_manipulation_toolbox/assembly.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import csv
import json
import logging
import os
from typing import *
from pathlib import Path
import uuid
import xml.parsers.expat
import zipfile

from .fmu_container import FMUContainer, FMUContainerError
from .ssp import SSP

logger = logging.getLogger("fmu_manipulation_toolbox")

Expand Down Expand Up @@ -81,7 +81,7 @@ def make_fmu(self, fmu_directory: Path, debug=False, description_pathname=None):

container = FMUContainer(self.name, fmu_directory, description_pathname=description_pathname)

for fmu_name in self.fmu_names_list:
for fmu_name in sorted(self.fmu_names_list):
container.get_fmu(fmu_name)

for port, source in self.input_ports.items():
Expand All @@ -108,7 +108,7 @@ def make_fmu(self, fmu_directory: Path, debug=False, description_pathname=None):

for node in self.children:
logger.info(f"Deleting transient FMU Container '{node.name}'")
os.remove(fmu_directory / node.name)
(fmu_directory / node.name).unlink()


class AssemblyError(Exception):
Expand All @@ -122,21 +122,25 @@ def __repr__(self):
class Assembly:
def __init__(self, filename: str, step_size = None, auto_link: bool = True, auto_input: bool = True,
auto_output: bool = True, mt: bool = False, profiling: bool = False, fmu_directory: Path = "."):

self.filename = Path(filename)
self.default_auto_input = auto_input
self.default_auto_output = auto_output
self.default_step_size = step_size
self.default_auto_link = auto_link
self.default_mt = mt
self.default_profiling = profiling
self.fmu_directory = fmu_directory
self.filename = Path(filename)
self.transient_filenames: List[Path] = []

if not fmu_directory.is_dir():
raise FMUContainerError(f"FMU directory is not valid: '{fmu_directory}'")

self.root = self.read()

def __del__(self):
for filename in self.transient_filenames:
filename.unlink()

def read(self) -> AssemblyNode:
logger.info(f"Reading '{self.filename}'")
if self.filename.suffix == ".json":
Expand Down Expand Up @@ -346,9 +350,31 @@ def json_decode_node(self, data) -> AssemblyNode:
return node

def read_ssp(self) -> AssemblyNode:
logger.info(f"Building FMU Container from '{self.filename}'")
logger.warning("This feature is ALPHA stage.")
ssp = SSP(self.fmu_directory, self.filename)
name = str(self.filename.with_suffix(".fmu"))
root = AssemblyNode(name, step_size=self.default_step_size, auto_link=self.default_auto_link,
mt=self.default_mt, profiling=self.default_profiling, auto_input=self.default_auto_input,
auto_output=self.default_auto_output)
def start_element(name, attrs):
if name == 'ssd:Connection':
root.add_link(attrs['startElement'] + '.fmu', attrs['startConnector'],
attrs['endElement'] + '.fmu', attrs['endConnector'])

with zipfile.ZipFile(self.fmu_directory / self.filename) as zin:
for file in zin.filelist:
target_filename = Path(file.filename).name
if file.filename.endswith(".fmu"): # Extract all FMUs into the fmu_directory
zin.getinfo(file.filename).filename = target_filename
zin.extract(file, path=self.fmu_directory)
logger.debug(f"Extraction {file.filename}")
self.transient_filenames.append(self.fmu_directory / file.filename)
elif file.filename == "SystemStructure.ssd":
logger.debug(f"Analysing {file.filename}")
with zin.open(file) as file_handle:
parser = xml.parsers.expat.ParserCreate()
parser.StartElementHandler = start_element
parser.ParseFile(file_handle)
return root

def make_fmu(self, debug=False):
self.root.make_fmu(self.fmu_directory, debug=debug, description_pathname=self.fmu_directory / self.filename)
52 changes: 0 additions & 52 deletions fmu_manipulation_toolbox/ssp.py

This file was deleted.

0 comments on commit 850f1ba

Please sign in to comment.