-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathmulpyplexer.py
160 lines (117 loc) · 4.43 KB
/
mulpyplexer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
from __future__ import print_function
from functools import reduce # pylint: disable=redefined-builtin
class MP(object):
def __init__(self, items):
super(MP, self).__setattr__('mp_items', items)
@staticmethod
def _resolve_object(a, n):
return a if not isinstance(a, MP) else a.mp_items[n]
def _expand(self, o):
expanded = [ ]
for n in range(len(self.mp_items)):
if isinstance(o, dict):
e = { k:self._resolve_object(a, n) for (k,a) in o.items() }
elif isinstance(o, list):
e = [ self._resolve_object(a, n) for a in o ]
elif isinstance(o, tuple):
e = tuple(self._resolve_object(a, n) for a in o)
else:
e = self._resolve_object(o, n)
expanded.append(e)
return expanded
def __repr__(self):
return "MP(%s)" % (self.mp_items,)
#
# Plex-throughs!
#
def __getattr__(self, k):
keys = self._expand(k)
return MP([ getattr(i, k) for i,k in zip(self.mp_items,keys) ])
def __setattr__(self, k, v):
keys = self._expand(k)
values = self._expand(v)
return MP([ setattr(i, k, v) for i,k,v in zip(self.mp_items,keys,values) ])
def __call__(self, *args, **kwargs):
expanded_args, expanded_kwargs = self._expand(args), self._expand(kwargs)
return MP([ i(*a, **k) for i,a,k in zip(self.mp_items,expanded_args,expanded_kwargs) ])
def __getitem__(self, k):
keys = self._expand(k)
return MP([ i[k] for i,k in zip(self.mp_items,keys) ])
def __setitem__(self, k, v):
keys = self._expand(k)
values = self._expand(v)
return MP([ i.__setitem__(k, v) for i,k,v in zip(self.mp_items,keys,values) ])
#
# Plexionality
#
def mp_len(self):
return [ len(i) for i in self.mp_items ]
def mp_map(self, func):
return MP([ func(i) for i in self.mp_items ])
def mp_filter(self, func):
return MP([ i for i in self.mp_items if func(i) ])
def mp_flatten(self):
items = [ ]
for i in self.mp_items:
if isinstance(i, (list, tuple, set)):
items += list(i)
return MP(items)
def mp_union(self):
items = set()
for i in self.mp_items:
if isinstance(i, (list, tuple, set)):
items |= set(i)
return MP(list(items))
@property
def mp_first(self):
return self.mp_items[0]
@property
def mp_last(self):
return self.mp_items[-1]
def mp_sorted(self, key=None, reverse=False):
return MP(sorted(self.mp_items, key=key, reverse=reverse))
def mp_reduce(self, function, initial=None):
reduce_args = [ function, self.mp_items ] if initial is None else [ function, self.mp_items, initial ]
return reduce(*reduce_args)
def __dir__(self):
attrs = frozenset.intersection(*[frozenset(dir(i)) for i in self.mp_items])
return list(sorted(attrs | { 'mp_items', 'mp_len', 'mp_map', 'mp_flatten', 'mp_union', 'mp_filter', 'mp_sorted', 'mp_reduce', 'mp_first', 'mp_last' } ))
def test():
class A(object):
def __init__(self, i, h=None):
self.i = i
self.h = [] if h is None else h + [i]
def add(self, j):
return A(self.i + (j.i if isinstance(j, A) else j), self.h)
def sub(self, j):
return A(self.i - (j.i if isinstance(j, A) else j), self.h)
def __repr__(self):
return "<A %d>" % self.i
def str(self):
return str(self.i)
def __eq__(self, o):
return self.i == o.i
one = MP([ A(10), A(20), A(30) ])
# test getattr
ga = one.i
assert ga.mp_items == [ 10, 20, 30 ]
# test call
two = one.add(5)
assert two.mp_items == [ A(15), A(25), A(35) ]
three = two.sub(10)
assert three.mp_items == [ A(5), A(15), A(25) ]
four = three.add(one)
assert four.mp_items == [ A(15), A(35), A(55) ]
five = four.str()
assert five.mp_items == [ "15", "35", "55" ]
six = four.i
assert six.mp_items == [ 15, 35, 55 ]
# test setattr
four.i = one.add(5).i
assert four.i.mp_items == [ 15, 25, 35 ]
assert four.i.mp_sorted(reverse=True).mp_items == [ 35, 25, 15 ]
import operator
assert four.i.mp_reduce(operator.__add__, initial=10) == 35 + 25 + 15 + 10
print("TESTS SUCCEEDED")
if __name__ == '__main__':
test()