-
Notifications
You must be signed in to change notification settings - Fork 36
/
Copy pathstore.py
283 lines (216 loc) · 8.44 KB
/
store.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
# -*- coding: utf-8 -*-
"""This is the module that defines a Store class
This module details all supported functions on the Store, as described in the specification.
This Store will behave as the interface between the core and the plugin interface
Example
-------
>>> from ditto.model import Store
Notes
-----
Store stores all the instances of objects required for a transformation
"""
from __future__ import absolute_import, division, print_function
from builtins import super, range, zip, round, map
import uuid
import logging
import types
from functools import partial
from .network.network import Network
from .core import DiTToBase, DiTToTypeError
from .modify.modify import Modifier
from .models.node import Node
logger = logging.getLogger(__name__)
class Store(object):
"""The Store class holds all functions supported in the transformation.
The Store stores all the instances of objects of different classes in a list
Examples
--------
>>> M = ditto.Store()
>>> M
<ditto.Store(elements=0, models=0)>
"""
__store_factory = dict
def __init__(self):
self._cim_store = self.__store_factory()
self._model_store = list()
self._model_names = {}
self._network = Network()
def __repr__(self):
return "<%s.%s(elements=%s, models=%s) object at %s>" % (
self.__class__.__module__,
self.__class__.__name__,
len(self.elements),
len(self.models),
hex(id(self)),
)
def __getitem__(self, k):
return self._model_names[k]
def __setitem__(self, k, v):
self._model_names[k] = v
def iter_elements(self, type=DiTToBase):
if type == None:
type = DiTToBase
if not issubclass(type, DiTToBase):
raise AttributeError("Unable to find {} in ditto.environment".format(type))
for e in self.elements:
if isinstance(e, type):
yield e
def iter_models(self, type=None):
if type == None:
type = object
for m in self.models:
if isinstance(m, type):
yield m
@property
def elements(self):
return list(self.cim_store[k] for k in self.cim_store)
@property
def models(self):
return tuple(m for m in self.model_store)
def remove_element(self, element):
self._model_store.remove(element)
def add_element(self, element):
if not isinstance(element, DiTToBase):
raise DiTToTypeError(
"element must be of type DiTToBase. Please check the documentation"
)
else:
element.link_model = self
self.cim_store[element.UUID] = element
def set_names(self):
""" All objects with a name field included in a dictionary which maps the name to the object. Set in set_name() on the object itself if the object has a name. The dictionary is reset to empty first"""
self._model_names = {}
for m in self.models:
m.set_name(self)
def build_networkx(self, source=None):
if source is not None:
self._network.build(self, source)
else:
self._network.build(self)
self._network.set_attributes(self)
def print_networkx(self):
logger.debug("Printing Nodes...")
self._network.print_nodes()
logger.debug("Printing Edges...")
self._network.print_edges()
# logger.debug('Printing Attributes...')
# self._network.print_attrs()
def delete_cycles(self):
""" First convert graph to directed graph (doubles the edges hence creating length 2 cycles)
Then find cycles of length greater than 2
Use heuristic of removing edge in the middle of the longest single phase section of the loop
If no single phase sections, remove edge the furthest from the source
"""
for i in self._network.find_cycles():
if len(i) > 2:
logger.debug("Detected cycle {cycle}".format(cycle=i))
edge = self._network.middle_single_phase(i)
for j in self.models:
if hasattr(j, "name") and j.name == edge:
logger.debug("deleting " + edge)
modifier = Modifier()
modifier.delete_element(self, j)
self.build_networkx()
def direct_from_source(self, source="sourcebus"):
ordered_nodes = self._network.bfs_order(source)
# logger.debug(ordered_nodes)
for i in self.models:
if (
hasattr(i, "from_element")
and i.from_element is not None
and hasattr(i, "to_element")
and i.to_element is not None
):
original = (i.from_element, i.to_element)
flipped = (i.to_element, i.from_element)
if flipped in ordered_nodes and (
not original in ordered_nodes
): # i.e. to cover the case where lines go both ways
tmp = i.from_element
i.from_element = i.to_element
i.to_element = tmp
def delete_disconnected_nodes(self):
for i in self.models:
if isinstance(i, Node) and hasattr(i, "name") and i.name is not None:
connected_nodes = self._network.get_nodes()
if not i.name in connected_nodes:
logger.debug("deleting " + i.name)
modifier = Modifier()
modifier.delete_element(self, i)
if isinstance(i, Node) and hasattr(i, "name") and i.name is None:
self.remove_element(i)
self.build_networkx() # Should be redundant since the networkx graph is only build on connected elements
def set_node_voltages(self):
self.set_names()
for i in self.models:
if isinstance(i, Node) and hasattr(i, "name") and i.name is not None:
upstream_transformer = self._network.get_upstream_transformer(
self, i.name
)
try:
upstream_voltage = (
self[upstream_transformer].windings[-1].nominal_voltage
)
i.nominal_voltage = upstream_voltage
except KeyError:
pass
def get_internal_edges(self, nodeset):
return self._network.find_internal_edges(nodeset)
@property
def cim_store(self):
return self._cim_store
@property
def model_store(self):
return self._model_store
@property
def model_names(self):
return self._model_names
class EnvAttributeIntercepter(object):
def __init__(self, model):
self.model = model
self.generate_attributes()
def generate_attributes(self):
for function_name, klass in self.model._env.get_clsmembers().items():
f = partial(model_builder, klass=klass, model=self.model)
f.__doc__ = klass.__doc__
f.__name__ = klass.__name__
setattr(
self, function_name, types.MethodType(f, self, EnvAttributeIntercepter)
)
def model_builder(self, klass, model, *args, **kwargs):
ic = partial(init_callback, model=model)
ic.__name__ = init_callback.__name__
cim_object = klass(
init_callback=ic,
get_callback=get_callback,
set_callback=set_callback,
del_callback=del_callback,
**kwargs
)
return cim_object
def init_callback(self, model, **kwargs):
self.link_model = model
if self.UUID is None:
self.UUID = uuid.uuid4()
self.link_model.cim_store[self.UUID] = self
def get_callback(self, name, val):
assert (
self.UUID in self.link_model.cim_store
), "UUID {} not found in Store {}. {} attributes value is {}".format(
self.UUID, self.link_model, name, val
)
def set_callback(self, name, value):
assert self.UUID in self.link_model.cim_store, "{} not found in Store {}".format(
self.UUID, self.link_model
)
if isinstance(value, tuple):
for v in value:
assert (
v.UUID in self.link_model.cim_store
), "{} not found in Store {}".format(self.UUID, self.link_model)
else:
assert (
value.UUID in self.link_model.cim_store
), "{} not found in Store {}".format(self.UUID, self.link_model)
def del_callback(self, name, obj):
pass