Skip to content

Commit

Permalink
esp_prov: make service_name optional for BLE
Browse files Browse the repository at this point in the history
and allow interactive scanning and selection of device
  • Loading branch information
ccutrer committed Jun 8, 2022
1 parent 1d78bba commit 0f3d9ee
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 17 deletions.
2 changes: 0 additions & 2 deletions tools/esp_prov/esp_prov.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,6 @@ async def get_transport(sel_transport, service_name):
service_name = '192.168.4.1:80'
tp = transport.Transport_HTTP(service_name)
elif (sel_transport == 'ble'):
if service_name is None:
raise RuntimeError('"--service_name" must be specified for ble transport')
# BLE client is now capable of automatically figuring out
# the primary service from the advertisement data and the
# characteristics corresponding to each endpoint.
Expand Down
66 changes: 51 additions & 15 deletions tools/esp_prov/transport/ble_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@

# --------------------------------------------------------------------

def device_sort(device):
return device.address


class BLE_Bleak_Client:
def __init__(self):
self.adapter_props = None
Expand All @@ -45,23 +49,56 @@ async def connect(self, devname, iface, chrc_names, fallback_srv_uuid):
raise RuntimeError('Bluetooth is not ready. Maybe try `bluetoothctl power on`?')
raise

address = None
for d in devices:
if d.name == self.devname:
address = d.address
uuids = d.metadata['uuids']
# There should be 1 service UUID in advertising data
# If bluez had cached an old version of the advertisement data
# the list of uuids may be incorrect, in which case connection
# or service discovery may fail the first time. If that happens
# the cache will be refreshed before next retry
if len(uuids) == 1:
self.srv_uuid_adv = uuids[0]
if not address:
found_device = None

if self.devname is None:
if len(devices) == 0:
print('No devices found!')
exit(1)

while True:
devices.sort(key=device_sort)
print('==== BLE Discovery results ====')
print('{0: >4} {1: <33} {2: <12}'.format(
'S.N.', 'Name', 'Address'))
for i in range(len(devices)):
print('[{0: >2}] {1: <33} {2: <12}'.format(i + 1, devices[i].name, devices[i].address))

while True:
try:
select = int(input('Select device by number (0 to rescan) : '))
if select < 0 or select > len(devices):
raise ValueError
break
except ValueError:
print('Invalid input! Retry')

if select != 0:
break

devices = await bleak.discover()

self.devname = devices[select - 1].name
found_device = devices[select - 1]
else:
for d in devices:
if d.name == self.devname:
found_device = d

if not found_device:
raise RuntimeError('Device not found')

uuids = found_device.metadata['uuids']
# There should be 1 service UUID in advertising data
# If bluez had cached an old version of the advertisement data
# the list of uuids may be incorrect, in which case connection
# or service discovery may fail the first time. If that happens
# the cache will be refreshed before next retry
if len(uuids) == 1:
self.srv_uuid_adv = uuids[0]

print('Connecting...')
self.device = bleak.BleakClient(address)
self.device = bleak.BleakClient(found_device.address)
await self.device.connect()

print('Getting Services...')
Expand Down Expand Up @@ -116,7 +153,6 @@ async def send_data(self, characteristic_uuid, data):
await self.device.write_gatt_char(characteristic_uuid, bytearray(data.encode('latin-1')), True)
readval = await self.device.read_gatt_char(characteristic_uuid)
return ''.join(chr(b) for b in readval)

# --------------------------------------------------------------------


Expand Down

0 comments on commit 0f3d9ee

Please sign in to comment.