-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfpgas.py
143 lines (121 loc) · 4.06 KB
/
fpgas.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
import os.path
import subprocess
from types import SimpleNamespace
from CONFIG import FPGA_STATUS_DISABLED, FPGA_STATUS_ENABLED, FPGA_DESCRIPTION, FPGA_COMMAND_LIST, FPGA_COMMAND_ENABLE, FPGA_COMMAND_DISABLE, FPGA_COMMAND_ENABLE_RETRY, FPGA_COMMAND_DISABLE_RETRY
class FPGAs:
def __init__(self):
self.fpgas = []
self.update()
self.state = []
def update(self):
"""
Updates the state of the fpgas
"""
# get all
output = subprocess.check_output(FPGA_COMMAND_LIST, universal_newlines=True)
# process
devices = []
for device in output.split("\n\n")[1:-1]:
lines = [line.split(':', 2)[1].strip() for line in device.split("\n")]
devices.append(SimpleNamespace(
id=lines[0],
device_description=lines[1],
status=lines[5],
))
# filter
fpgas = [device for device in devices if device.device_description == FPGA_DESCRIPTION]
# modify
prefix = len(os.path.commonprefix([fpga.id for fpga in fpgas]))
suffix = len(os.path.commonprefix([fpga.id[::-1] for fpga in fpgas]))
for fpga in fpgas:
fpga.enabled = (
True if fpga.status in FPGA_STATUS_ENABLED
else False if fpga.status in FPGA_STATUS_DISABLED
else None
)
fpga.name = fpga.id[prefix:-suffix] if len(fpgas) > 1 else fpga.id
# log
print("FPGAs found:")
for fpga in fpgas:
print(">", fpga)
self.fpgas = fpgas
def enabled(self, i):
"""
returns true if board 'i' is enabled, false if isn't, None if undefined/error
"""
return self.fpgas[i].enabled
def allEnabled(self):
"""
returns true iff all boards are enabled (and can be disabled)
"""
return all(self.enabled(i) is not False for i in self)
def allDisabled(self):
"""
returns true iff all boards are disabled (and can be enabled)
"""
return all(self.enabled(i) is not True for i in self)
def name(self, i):
"""
returns the name of board 'i'
"""
return self.fpgas[i].name
def id(self, i):
"""
Returns the id of board 'i'
"""
return self.fpgas[i].id
def toggle(self, i, state=None):
"""
Sets the state of board 'i'
Call without parameters to toggle
"""
if self.enabled(i) is None: return
if state is None: state = not self.enabled(i)
self.enable(i) if state else self.disable(i)
def enable(self, i):
"""
Enables board 'i'
"""
if self.enabled(i) is not False: return
print("Enabling", i)
for _ in range(FPGA_COMMAND_ENABLE_RETRY):
try:
print(subprocess.check_call(FPGA_COMMAND_ENABLE % self.fpgas[i].id))
break
except Exception:
pass
else:
print("Unable to enable the device")
self.fpgas[i].enabled = True
def disable(self, i):
"""
Disable board 'i'
"""
if self.enabled(i) is not True: return
print("Disabling", i)
for _ in range(FPGA_COMMAND_DISABLE_RETRY):
try:
print(subprocess.check_call(FPGA_COMMAND_DISABLE % self.fpgas[i].id))
break
except Exception:
pass
else:
print("Unable to disable the device")
self.fpgas[i].enabled = False
def get_state(self):
"""
returns all the current states of all boards
"""
return [(i, self.enabled(i)) for i in self]
def __len__(self):
"""
the length of this object is the number of available fpgas
for convenience
"""
return len(self.fpgas)
def __iter__(self):
"""
Iterating this object is the same as iterating range(len(self))
for convenience
"""
return range(len(self)).__iter__()