-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathbluetoothtest.py
293 lines (263 loc) · 10.9 KB
/
bluetoothtest.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
import time
import pexpect
import subprocess
import sys
import os
import syslog
def bluetooth_preInit():
status = 'error'
print "bluetooth_preInit!"
# Initialize bluetooth device
child = pexpect.spawn("bb-wl18xx-bluetooth")
out = child.expect(["Device setup complete", "Can't initialize device: Success", "Initialization timed out", pexpect.EOF])
print "bb-wl18xx-bluetooth result: ", out
if 2 > out:
return 'ok'
if 2 == out:
child = pexpect.spawn("bb-wl18xx-bluetooth")
out = child.expect(["Device setup complete", "Can't initialize device: Success", "Initialization timed out", pexpect.EOF])
print "bb-wl18xx-bluetooth result: ", out
if 1 < out:
return status
# Remove all bluetooth mac adress info
# os.system("rm /var/lib/bluetooth/* -rf")
# Restart bluetooth service
#os.system("systemctl restart bluetooth")
# subprocess.call(['systemctl', 'restart', 'bluetooth'])
#time.sleep(5)
# results = os.popen("ls /var/lib/bluetooth/").read()
# print "len(results): ", len(results)
# if 18 == len(results):
status = 'ok'
return status
class BluetoothctlError(Exception):
"""This exception is raised, when bluetoothctl fails to start."""
pass
class Bluetoothctl:
"""A wrapper for bluetoothctl utility."""
def __init__(self):
f = file("./plog.out", 'w')
# Start to open bluetoothctl
out = subprocess.check_output("rfkill unblock bluetooth", shell = True)
self.child = pexpect.spawn("bluetoothctl")
# TO-DO
# If run bluetoothctl and can't get beaglebone MAC address, then reinitialize again
self.child.logfile = f
def __del__(self):
self.child.sendline("quit")
# print self.child.before
self.child
print "del function"
def get_output(self, command, pause = 0):
"""Run a command in bluetoothctl prompt, return output as a list of lines."""
self.child.send(command + "\n")
time.sleep(pause)
start_failed = self.child.expect(["bluetooth", pexpect.EOF])
if start_failed:
raise BluetoothctlError("Bluetoothctl failed after running " + command)
return self.child.before.split("\r\n")
def start_scan(self):
"""Start bluetooth scanning process."""
try:
out = self.get_output("scan on")
except BluetoothctlError, e:
print(e)
return None
def make_discoverable(self):
"""Make device discoverable."""
try:
out = self.get_output("discoverable on")
except BluetoothctlError, e:
print(e)
return None
def parse_device_info(self, info_string):
"""Parse a string corresponding to a device."""
device = {}
block_list = ["[\x1b[0;", "removed"]
string_valid = not any(keyword in info_string for keyword in block_list)
if string_valid:
try:
device_position = info_string.index("Device")
except ValueError:
pass
else:
if device_position > -1:
attribute_list = info_string[device_position:].split(" ", 2)
device = {
"mac_address": attribute_list[1],
"name": attribute_list[2]
}
return device
def get_available_devices(self):
"""Return a list of tuples of paired and discoverable devices."""
try:
out = self.get_output("devices")
except BluetoothctlError, e:
print(e)
return None
else:
available_devices = []
for line in out:
device = self.parse_device_info(line)
if device:
available_devices.append(device)
return available_devices
def get_paired_devices(self):
"""Return a list of tuples of paired devices."""
try:
out = self.get_output("paired-devices")
except BluetoothctlError, e:
print(e)
return None
else:
paired_devices = []
for line in out:
device = self.parse_device_info(line)
if device:
paired_devices.append(device)
return paired_devices
def get_discoverable_devices(self):
"""Filter paired devices out of available."""
available = self.get_available_devices()
paired = self.get_paired_devices()
return [d for d in available if d not in paired]
def get_device_info(self, mac_address):
"""Get device info by mac address."""
try:
out = self.get_output("info " + mac_address)
except BluetoothctlError, e:
print(e)
return None
else:
return out
def pair(self, mac_address):
"""Try to pair with a device by mac address."""
try:
out = self.get_output("pair " + mac_address, 4)
except BluetoothctlError, e:
print(e)
return None
else:
res = self.child.expect(["Failed to pair", "Pairing successful", pexpect.EOF])
success = True if res == 1 else False
return success
def remove(self, mac_address):
"""Remove paired device by mac address, return success of the operation."""
try:
out = self.get_output("remove " + mac_address, 3)
except BluetoothctlError, e:
print(e)
return None
else:
res = self.child.expect(["not available", "Device has been removed", pexpect.EOF])
success = True if res == 1 else False
return success
def connect(self, mac_address):
"""Try to connect to a device by mac address."""
try:
out = self.get_output("connect " + mac_address, 2)
except BluetoothctlError, e:
print(e)
return None
else:
res = self.child.expect(["Failed to connect", "Connection successful", pexpect.EOF])
success = True if res == 1 else False
return success
def disconnect(self, mac_address):
"""Try to disconnect to a device by mac address."""
try:
out = self.get_output("disconnect " + mac_address, 2)
except BluetoothctlError, e:
print(e)
return None
else:
res = self.child.expect(["Failed to disconnect", "Successful disconnected", pexpect.EOF])
success = True if res == 1 else False
return success
# def run_test(self,dist_mac_addr = "78:09:73:C8:77:50"):
# def run_test(self,dist_mac_addr = "51:91:E2:BF:AE:47"):
# def run_test(self,dist_mac_addr = "74:51:BA:F2:E4:72"):
def run_test(self,dist_mac_addr):
status = 'error'
print dist_mac_addr
#timestart = time.time()
self.child.sendline("devices")
print "devices"
results = self.child.expect([dist_mac_addr, pexpect.EOF, pexpect.TIMEOUT])
#if the mac_addr dont exist,then scan on
if results != 0
self.child.sendline("scan on")
print "Scan on!"
# time.sleep(10)
results = self.child.expect([dist_mac_addr, "Failed to start", pexpect.EOF, pexpect.TIMEOUT], timeout=60)
if 3 == results:
print "scan timeout"
self.child.sendline("scan off")
return status
if results > 0:
print "scan on result: ", results
return status
#if the mac_addr exist in devices list or in the scan list,then pair
if 0 == results :
# pair device for three times
self.child.sendline("pair " + dist_mac_addr)
results = self.child.expect(["Paired: yes", "Fail", pexpect.EOF])
if 0 != results:
self.child.sendline("pair " + dist_mac_addr)
results = self.child.expect(["Paired: yes", "Fail", pexpect.EOF])
if 0 != results:
self.child.sendline("pair " + dist_mac_addr)
results = self.child.expect(["Paired: yes", "Fail", pexpect.EOF])
if 0!= results:
return status
if 0 == results:
# connect device for three time
print "Pair ok!"
self.child.sendline("connect " + dist_mac_addr)
results = self.child.expect(["Connection successful", "Fail", "org.bluez.Error.Failed", pexpect.EOF])
if 0 < results:
time.sleep(2)
self.child.sendline("connect " + dist_mac_addr)
results = self.child.expect(["Connection successful", "Fail", "org.bluez.Error.Failed", pexpect.EOF])
if 0 < results:
# self.child.sendline("connect " + dist_mac_addr)
# results = self.child.expect(["Connection successful", "Fail", "org.bluez.Error.Failed", pexpect.EOF])
# if 0 < results:
return status
time.sleep(1)
if 0 == results:
# print("connect: " + dist_mac_addr)
# if dist_mac_addr=="51:91:E2:BF:AE:47" :
# print "unit1 play music"
# os.system("mpg123 /root/factory_test/music/unit1.mp3")
# elif dist_mac_addr=="49:7A:10:6F:2D:D4":
# print "unit2 play music"
# os.system("mpg123 /root/factory_test/music/unit2.mp3")
# elif dist_mac_addr=="EF:50:7B:BD:C8:16":
# print "unit3 play music"
# os.system("mpg123 /root/factory_test/music/unit3.mp3")
# elif dist_mac_addr=="3A:6B:13:A0:32:B6":
# print "unit4 play music"
# os.system("mpg123 /root/factory_test/music/unit4.mp3")
# elif dist_mac_addr=="78:09:73:C8:77:50":
# print "unit5 play music"
# os.system("mpg123 /root/factory_test/music/unit5.mp3")
# elif dist_mac_addr=="FC:CD:B3:39:96:66":
# print "unit6 play music"
# os.system("mpg123 /root/factory_test/music/unit6.mp3")
# time.sleep(2)
# Disconnect bluetooth
# self.child.sendline("disconnect " + dist_mac_addr)
status = 'ok'
time.sleep(2)
# self.child.sendline("quit")
return status
if __name__ == "__main__":
if 'ok' == bluetooth_preInit():
dist_mac_addr=sys.stdin.readline()
print dist_mac_addr
bl = Bluetoothctl()
result = bl.run_test(dist_mac_addr)
print "bluetooth test result: ", result
else:
print "error"