Skip to content

Commit

Permalink
refactor Assembly class
Browse files Browse the repository at this point in the history
  • Loading branch information
nl78 committed Dec 17, 2024
1 parent 7b29f16 commit 737af44
Show file tree
Hide file tree
Showing 3 changed files with 137 additions and 131 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,5 @@ fmu_manipulation_toolbox/resources/darwin64/client_sm.dylib
fmu_manipulation_toolbox/resources/darwin64/container.dylib
fmu_manipulation_toolbox/resources/darwin64/server_sm
tests/containers/bouncing_ball/bouncing.json
tests/containers/arch/hierarchical.fmu
tests/containers/arch/flat.fmu
244 changes: 130 additions & 114 deletions fmu_manipulation_toolbox/assembly.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,15 @@ def __init__(self, from_port: Port, to_port: Port):


class AssemblyNode:
def __init__(self, name: str, step_size:float = None, mt = False, profiling = False,
auto_link = True):
def __init__(self, name: str, step_size: float = None, mt = False, profiling = False,
auto_link=True, auto_input=True, auto_output=True):
self.name = name
self.step_size = step_size
self.mt = mt
self.profiling = profiling
self.auto_link = auto_link
self.auto_input = auto_input
self.auto_output = auto_output
self.children: List[AssemblyNode] = []

self.fmu_names_list: List[str] = []
Expand Down Expand Up @@ -119,104 +121,51 @@ def __repr__(self):


class Assembly:
def __init__(self, description_pathname, root: AssemblyNode, auto_input = True, auto_output = True,
fmu_directory: Path = "."):
self.description_pathname = description_pathname
self.root = root
self.auto_input = auto_input
self.auto_output = auto_output
self.fmu_directory = Path(fmu_directory)
def __init__(self, filename: str, step_size = None, auto_link: bool = True, auto_input: bool = True,
auto_output: bool = True, mt: bool = False, profiling: bool = False, fmu_directory: Path = "."):

def write(self, description_filename: Union[str, Path]):
if description_filename.endswith(".csv"):
return self.write_csv(description_filename)
elif description_filename.endswith(".json"):
return self.write_json(description_filename)
self.default_auto_input = auto_input
self.default_auto_output = auto_output
self.default_step_size = step_size
self.default_auto_link = auto_link
self.default_mt = mt
self.default_profiling = profiling
self.fmu_directory = fmu_directory
self.filename = Path(filename)

if not fmu_directory.is_dir():
raise FMUContainerError(f"FMU directory is not valid: '{fmu_directory}'")

self.root = self.read()

def read(self) -> AssemblyNode:
logger.info(f"Reading '{self.filename}'")
if self.filename.suffix == ".json":
return self.read_json()
elif self.filename.suffix == ".ssp":
return self.read_ssp()
elif self.filename.suffix == ".csv":
return self.read_csv()
else:
logger.critical(f"Unable to write to '{description_filename}': format unsupported.")

def write_csv(self, description_filename: Union[str, Path]):
if self.root.children:
raise AssemblyError("This assembly is not flat. Cannot export to CSV file.")

with open(self.fmu_directory / description_filename, "wt") as outfile:
print("rule;from_fmu;from_port;to_fmu;to_port", file=outfile)
for fmu in self.root.fmu_names_list:
print(f"FMU;{fmu};;;", file=outfile)
for port, source in self.root.input_ports.items():
print(f"INPUT;;{source};{port.fmu_name};{port.port_name}", file=outfile)
for port, target in self.root.output_ports.items():
print(f"OUTPUT;{port.fmu_name};{port.port_name};;{target}", file=outfile)
for link in self.root.links:
print(f"LINK;{link.from_port.fmu_name};{link.from_port.port_name};"
f"{link.to_port.fmu_name};{link.to_port.port_name}", file=outfile)
for port, value in self.root.start_values.items():
print(f"START;{port.fmu_name};{port.port_name};{value};", file=outfile)
for port in self.root.drop_ports:
print(f"DROP;{port.fmu_name};{port.port_name};;", file=outfile)

def json_encode_node(self, node: AssemblyNode) -> Dict[str, Any]:
json_node = dict()
json_node["name"] = node.name
json_node["mt"] = node.mt
json_node["profiling"] = node.profiling
json_node["auto_link"] = node.auto_link
if node.step_size:
json_node["step_size"] = node.step_size

if node.children:
json_node["container"] = [ self.json_encode_node(child) for child in node.children]

if node.fmu_names_list:
json_node["fmu"] = [f"{fmu_name}" for fmu_name in node.fmu_names_list]

if node.input_ports:
json_node["input"] = [[f"{source}", f"{port.fmu_name}", "{port.port_name}"]
for port, source in node.input_ports.items()]

if node.output_ports:
json_node["output"] = [[f"{port.fmu_name}", "{port.port_name}", "{target}"]
for port, target in node.output_ports.items()]

if node.links:
json_node["link"] = [[f"{link.from_port.fmu_name}", f"{link.from_port.port_name}",
f"{link.to_port.fmu_name}", f"{link.to_port.port_name}"]
for link in node.links]

if node.start_values:
json_node["start"] = [[f"{port.fmu_name}", f"{port.port_name}", value]
for port, value in node.start_values.items()]

if node.drop_ports:
json_node["drop"] = [[f"{port.fmu_name}", f"{port.port_name}"] for port in node.drop_ports]

return json_node

def write_json(self, description_filename: Union[str, Path]):
with open(self.fmu_directory / description_filename, "wt") as file:
data = self.json_encode_node(self.root)
json.dump(data, file, indent=2)

def make_fmu(self, debug=False):
self.write_json("toto.json")
self.root.generate_fmu(self.fmu_directory, auto_input=self.auto_input, auto_output=self.auto_output,
debug=debug, description_pathname=self.fmu_directory / self.description_pathname)


class AssemblyCSV(Assembly):
def __init__(self, csv_filename: str, step_size = None, auto_link: bool = True, auto_input: bool = True,
auto_output: bool = True, mt: bool = False, profiling: bool = False, fmu_directory: Path = "."):
raise FMUContainerError(f"Not supported file format '{self.filename}")

name = str(Path(csv_filename).with_suffix(".fmu"))
root = AssemblyNode(name, step_size=step_size, auto_link=auto_link, mt=mt, profiling=profiling)
super().__init__(csv_filename, root, auto_input=auto_input, auto_output=auto_output,
fmu_directory=fmu_directory)
def write(self, filename: str):
if filename.endswith(".csv"):
return self.write_csv(filename)
elif filename.endswith(".json"):
return self.write_json(filename)
else:
logger.critical(f"Unable to write to '{filename}': format unsupported.")

logger.info(f"Reading '{csv_filename}'")
def read_csv(self) -> AssemblyNode:
name = str(self.filename.with_suffix(".fmu"))
root = AssemblyNode(name, step_size=self.default_step_size, auto_link=self.default_auto_link,
mt=self.default_mt, profiling=self.default_profiling, auto_input=self.default_auto_input,
auto_output=self.default_auto_output)

with open(fmu_directory / csv_filename) as file:
with open(self.fmu_directory / self.filename) as file:
reader = csv.reader(file, delimiter=';')
self.check_headers(reader)
self.check_csv_headers(reader)
for i, row in enumerate(reader):
if not row or row[0][0] == '#': # skip blank line of comment
continue
Expand All @@ -239,6 +188,28 @@ def __init__(self, csv_filename: str, step_size = None, auto_link: bool = True,
else:
logger.error(f"Line #{i+2}: unexpected rule '{rule}'. Line skipped.")

return root

def write_csv(self, filename: Union[str, Path]):
if self.root.children:
raise AssemblyError("This assembly is not flat. Cannot export to CSV file.")

with open(self.fmu_directory / filename, "wt") as outfile:
print("rule;from_fmu;from_port;to_fmu;to_port", file=outfile)
for fmu in self.root.fmu_names_list:
print(f"FMU;{fmu};;;", file=outfile)
for port, source in self.root.input_ports.items():
print(f"INPUT;;{source};{port.fmu_name};{port.port_name}", file=outfile)
for port, target in self.root.output_ports.items():
print(f"OUTPUT;{port.fmu_name};{port.port_name};;{target}", file=outfile)
for link in self.root.links:
print(f"LINK;{link.from_port.fmu_name};{link.from_port.port_name};"
f"{link.to_port.fmu_name};{link.to_port.port_name}", file=outfile)
for port, value in self.root.start_values.items():
print(f"START;{port.fmu_name};{port.port_name};{value};", file=outfile)
for port in self.root.drop_ports:
print(f"DROP;{port.fmu_name};{port.port_name};;", file=outfile)

@staticmethod
def _read_csv_rule(node: AssemblyNode, rule: str, from_fmu_filename: str, from_port_name: str,
to_fmu_filename: str, to_port_name: str):
Expand Down Expand Up @@ -277,33 +248,77 @@ def _read_csv_rule(node: AssemblyNode, rule: str, from_fmu_filename: str, from_p
# no else: check on rule is already done in read_description()

@staticmethod
def check_headers(reader):
def check_csv_headers(reader):
headers = next(reader)
if not headers == ["rule", "from_fmu", "from_port", "to_fmu", "to_port"]:
raise AssemblyError("Header (1st line of the file) is not well formatted.")


class AssemblyJson(Assembly):
def __init__(self, filename: str, fmu_directory: Path = "."):
with open(fmu_directory / filename) as file:
def read_json(self) -> AssemblyNode:
with open(self.fmu_directory / self.filename) as file:
try:
data = json.load(file)
except json.decoder.JSONDecodeError as e:
raise FMUContainerError(f"Cannot read json: {e}")
root = self.json_decode_node(data)
root.name = str(Path(filename).with_suffix(".fmu"))
root.name = str(self.filename.with_suffix(".fmu"))
auto_input = data.get("auto_input", True)
auto_output = data.get("auto_output", True)
super().__init__(filename, root, auto_input=auto_input, auto_output=auto_output, fmu_directory=fmu_directory)

return root

def write_json(self, filename: Union[str, Path]):
with open(self.fmu_directory / filename, "wt") as file:
data = self.json_encode_node(self.root)
json.dump(data, file, indent=2)

def json_encode_node(self, node: AssemblyNode) -> Dict[str, Any]:
json_node = dict()
json_node["name"] = node.name
json_node["mt"] = node.mt
json_node["profiling"] = node.profiling
json_node["auto_link"] = node.auto_link
if node.step_size:
json_node["step_size"] = node.step_size

if node.children:
json_node["container"] = [self.json_encode_node(child) for child in node.children]

if node.fmu_names_list:
json_node["fmu"] = [f"{fmu_name}" for fmu_name in node.fmu_names_list]

if node.input_ports:
json_node["input"] = [[f"{source}", f"{port.fmu_name}", "{port.port_name}"]
for port, source in node.input_ports.items()]

if node.output_ports:
json_node["output"] = [[f"{port.fmu_name}", "{port.port_name}", "{target}"]
for port, target in node.output_ports.items()]

if node.links:
json_node["link"] = [[f"{link.from_port.fmu_name}", f"{link.from_port.port_name}",
f"{link.to_port.fmu_name}", f"{link.to_port.port_name}"]
for link in node.links]

if node.start_values:
json_node["start"] = [[f"{port.fmu_name}", f"{port.port_name}", value]
for port, value in node.start_values.items()]

if node.drop_ports:
json_node["drop"] = [[f"{port.fmu_name}", f"{port.port_name}"] for port in node.drop_ports]

return json_node

def json_decode_node(self, data) -> AssemblyNode:
name = data.get("name", None)
step_size = data.get("step_size", None)
auto_link = data.get("auto_link", True)
mt = data.get("mt", False)
profiling = data.get("profiling", False)
step_size = data.get("step_size", self.default_step_size)
auto_link = data.get("auto_link", self.default_auto_link)
auto_input = data.get("auto_input", self.default_auto_input)
auto_output = data.get("auto_output", self.default_auto_output)
mt = data.get("mt", self.default_mt)
profiling = data.get("profiling", self.default_profiling)

node = AssemblyNode(name, step_size=step_size, auto_link=auto_link, mt=mt, profiling=profiling)
node = AssemblyNode(name, step_size=step_size, auto_link=auto_link, mt=mt, profiling=profiling,
auto_input=auto_input, auto_output=auto_output)

if "container" in data:
for sub_data in data["container"]:
Expand All @@ -330,12 +345,13 @@ def json_decode_node(self, data) -> AssemblyNode:
node.add_drop_port(line[0], line[1])

return node


class AssemblySSP(Assembly):
def __init__(self, ssp_filename: str, auto_link: bool = True, auto_input: bool = True, auto_output: bool = True,
mt: bool = False, profiling: bool = False, fmu_directory: str = "."):
logger.info(f"Building FMU Container from '{ssp_filename}'")

def read_ssp(self) -> AssemblyNode:
logger.info(f"Building FMU Container from '{self.filename}'")
logger.warning("This feature is ALPHA stage.")
ssp = SSP(self.fmu_directory, ssp_filename)
container = FMUContainer(ssp_filename.stem, self.fmu_directory)
ssp = SSP(self.fmu_directory, self.filename)

def make_fmu(self, debug=False):
self.write_json("toto.json")
self.root.generate_fmu(self.fmu_directory, auto_input=self.default_auto_input, auto_output=self.default_auto_output,
debug=debug, description_pathname=self.fmu_directory / self.filename)
22 changes: 5 additions & 17 deletions fmu_manipulation_toolbox/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from .fmu_operations import *
from .fmu_container import FMUContainerError
from .assembly import AssemblyCSV, AssemblyJson, AssemblySSP
from .assembly import Assembly
from .checker import checker_list
from .version import __version__ as version
from .help import Help
Expand Down Expand Up @@ -191,10 +191,6 @@ def fmucontainer():
logger.setLevel(logging.DEBUG)

fmu_directory = Path(config.fmu_directory)
if not fmu_directory.is_dir():
logger.fatal(f"FMU directory is not valid: '{fmu_directory}'")
return

logger.info(f"FMU directory: '{fmu_directory}'")

for description in config.container_descriptions_list:
Expand All @@ -204,19 +200,11 @@ def fmucontainer():
except ValueError:
step_size = None
filename = description

try:
if filename.endswith(".json"):
assembly = AssemblyJson(filename, fmu_directory=fmu_directory)
elif filename.endswith(".ssp"):
assembly = AssemblySSP()
elif filename.endswith(".csv"):
assembly = AssemblyCSV(filename, step_size=step_size, auto_link=config.auto_link,
auto_input=config.auto_input, auto_output=config.auto_output, mt=config.mt,
profiling=config.profiling, fmu_directory=fmu_directory)
else:
logger.fatal(f"Not supported file format '{description}")
return
assembly = Assembly(filename, step_size=step_size, auto_link=config.auto_link,
auto_input=config.auto_input, auto_output=config.auto_output, mt=config.mt,
profiling=config.profiling, fmu_directory=fmu_directory)

except FileNotFoundError as e:
logger.fatal(f"Cannot read file: {e}")
continue
Expand Down

0 comments on commit 737af44

Please sign in to comment.