From a42395b6000c4ebf58b2cd966da0ff427d692463 Mon Sep 17 00:00:00 2001 From: Alvipe Date: Mon, 6 Mar 2017 12:23:17 +0100 Subject: [PATCH 1/2] Add a power off function I have added a function to power off the Myo Armband (actually, according to the official BLE specification, the 0x04 command puts the Myo into deep sleep, there is no way to completely turn the device off). I think this is a very useful feature since, without this function, you have to wait until the Myo battery is fully discharged, or use the official Myo app for Windows or Mac and turn off the device from there. The base script is the myo_raw.py modified by Fernando Cosentino, as the original script by dzhu throws some errors. --- myo_raw.py | 33 ++++++++++++++++++++++----------- 1 file changed, 22 insertions(+), 11 deletions(-) diff --git a/myo_raw.py b/myo_raw.py index 392c215..fa7bf4d 100644 --- a/myo_raw.py +++ b/myo_raw.py @@ -1,3 +1,15 @@ +''' + Original by dzhu + https://github.com/dzhu/myo-raw + + Edited by Fernando Cosentino + http://www.fernandocosentino.net/pyoconnect + + Edited by Alvaro Villoslada (Alvipe) + https://github.com/Alvipe/myo-raw +''' + + from __future__ import print_function import enum @@ -43,7 +55,6 @@ class Pose(enum.Enum): THUMB_TO_PINKY = 5 UNKNOWN = 255 - class Packet(object): def __init__(self, ords): self.typ = ords[0] @@ -56,7 +67,6 @@ def __repr__(self): (self.typ, self.cls, self.cmd, ' '.join('%02X' % b for b in multiord(self.payload))) - class BT(object): '''Implements the non-Myo-specific details of the Bluetooth protocol.''' def __init__(self, tty): @@ -166,7 +176,6 @@ def send_command(self, cls, cmd, payload=b'', wait_resp=True): ## not a response: must be an event self.handle_event(p) - class MyoRaw(object): '''Implements the Myo-specific communication protocol.''' @@ -288,7 +297,7 @@ def handle_data(p): gyro = vals[7:10] self.on_imu(quat, acc, gyro) elif attr == 0x23: - typ, val, xdir = unpack('3B', pay) + typ, val, xdir, _,_,_ = unpack('6B', pay) if typ == 1: # on arm self.on_arm(Arm(val), XDirection(xdir)) @@ -315,13 +324,16 @@ def disconnect(self): if self.conn is not None: self.bt.disconnect(self.conn) + def power_off(self): + self.write_attr(0x19, b'\x04\x00') + def start_raw(self): '''Sending this sequence for v1.0 firmware seems to enable both raw data and pose notifications. ''' self.write_attr(0x28, b'\x01\x00') - self.write_attr(0x19, b'\x01\x03\x01\x01\x00') + #self.write_attr(0x19, b'\x01\x03\x01\x01\x00') self.write_attr(0x19, b'\x01\x03\x01\x01\x01') def mc_start_collection(self): @@ -367,7 +379,6 @@ def vibrate(self, length): ## first byte tells it to vibrate; purpose of second byte is unknown self.write_attr(0x19, pack('3B', 3, 1, length)) - def add_emg_handler(self, h): self.emg_handlers.append(h) @@ -380,7 +391,6 @@ def add_pose_handler(self, h): def add_arm_handler(self, h): self.arm_handlers.append(h) - def on_emg(self, emg, moving): for h in self.emg_handlers: h(emg, moving) @@ -397,7 +407,6 @@ def on_arm(self, arm, xdir): for h in self.arm_handlers: h(arm, xdir) - if __name__ == '__main__': try: import pygame @@ -407,7 +416,7 @@ def on_arm(self, arm, xdir): HAVE_PYGAME = False if HAVE_PYGAME: - w, h = 1200, 400 + w, h = 800, 600 scr = pygame.display.set_mode((w, h)) last_vals = None @@ -475,5 +484,7 @@ def proc_emg(emg, moving, times=[]): except KeyboardInterrupt: pass finally: - m.disconnect() - print() + m.power_off() + print("Power off") + # m.disconnect() + # print("Disconnected") From 680775071d0dd4defb88b35f91d9122c0645eedb Mon Sep 17 00:00:00 2001 From: Alvipe Date: Fri, 21 Apr 2017 14:57:05 +0200 Subject: [PATCH 2/2] Modify start_raw() to get raw EMG and add new functionalities Although the original code is supposed to read the raw EMG signals measured by the sensor pods of the Myo, it actually reads rectified and smoothed EMG signals, a measure of their amplitude. The code has been modified to get the real raw signals, by suscribing to the raw EMG notification characteristics by writing to and reading from the corresponding handles listed on the Thalmic Labs BLE specification. A new function to change the color of the logo and bar LEDs has been added. The program is now able to get the battery level in the same manner that it gets the EMG, IMU or classifier data. By suscribing to the battery characteristic, the Myo sends the battery level every time its value changes. --- myo_raw.py | 267 +++++++++++++++++++++++++++++++++++++---------------- 1 file changed, 188 insertions(+), 79 deletions(-) diff --git a/myo_raw.py b/myo_raw.py index fa7bf4d..e301631 100644 --- a/myo_raw.py +++ b/myo_raw.py @@ -1,9 +1,9 @@ ''' - Original by dzhu - https://github.com/dzhu/myo-raw + Original by dzhu + https://github.com/dzhu/myo-raw - Edited by Fernando Cosentino - http://www.fernandocosentino.net/pyoconnect + Edited by Fernando Cosentino + http://www.fernandocosentino.net/pyoconnect Edited by Alvaro Villoslada (Alvipe) https://github.com/Alvipe/myo-raw @@ -24,28 +24,33 @@ from common import * + def multichr(ords): if sys.version_info[0] >= 3: return bytes(ords) else: return ''.join(map(chr, ords)) + def multiord(b): if sys.version_info[0] >= 3: return list(b) else: return map(ord, b) + class Arm(enum.Enum): UNKNOWN = 0 RIGHT = 1 LEFT = 2 + class XDirection(enum.Enum): UNKNOWN = 0 X_TOWARD_WRIST = 1 X_TOWARD_ELBOW = 2 + class Pose(enum.Enum): REST = 0 FIST = 1 @@ -55,6 +60,7 @@ class Pose(enum.Enum): THUMB_TO_PINKY = 5 UNKNOWN = 255 + class Packet(object): def __init__(self, ords): self.typ = ords[0] @@ -67,6 +73,7 @@ def __repr__(self): (self.typ, self.cls, self.cmd, ' '.join('%02X' % b for b in multiord(self.payload))) + class BT(object): '''Implements the non-Myo-specific details of the Bluetooth protocol.''' def __init__(self, tty): @@ -75,14 +82,16 @@ def __init__(self, tty): self.lock = threading.Lock() self.handlers = [] - ## internal data-handling methods + # internal data-handling methods def recv_packet(self, timeout=None): t0 = time.time() self.ser.timeout = None while timeout is None or time.time() < t0 + timeout: - if timeout is not None: self.ser.timeout = t0 + timeout - time.time() + if timeout is not None: + self.ser.timeout = t0 + timeout - time.time() c = self.ser.read() - if not c: return None + if not c: + return None ret = self.proc_byte(ord(c)) if ret: @@ -95,13 +104,14 @@ def recv_packets(self, timeout=.5): t0 = time.time() while time.time() < t0 + timeout: p = self.recv_packet(t0 + timeout - time.time()) - if not p: return res + if not p: + return res res.append(p) return res def proc_byte(self, c): if not self.buf: - if c in [0x00, 0x80, 0x08, 0x88]: + if c in [0x00, 0x80, 0x08, 0x88]: # [BLE response pkt, BLE event pkt, wifi response pkt, wifi event pkt] self.buf.append(c) return None elif len(self.buf) == 1: @@ -125,11 +135,14 @@ def add_handler(self, h): self.handlers.append(h) def remove_handler(self, h): - try: self.handlers.remove(h) - except ValueError: pass + try: + self.handlers.remove(h) + except ValueError: + pass def wait_event(self, cls, cmd): res = [None] + def h(p): if p.cls == cls and p.cmd == cmd: res[0] = p @@ -139,7 +152,7 @@ def h(p): self.remove_handler(h) return res[0] - ## specific BLE commands + # specific BLE commands def connect(self, addr): return self.send_command(6, 3, pack('6sBHHHH', multichr(addr), 0, 6, 6, 64, 0)) @@ -169,13 +182,13 @@ def send_command(self, cls, cmd, payload=b'', wait_resp=True): while True: p = self.recv_packet() - - ## no timeout, so p won't be None - if p.typ == 0: return p - - ## not a response: must be an event + # no timeout, so p won't be None + if p.typ == 0: + return p + # not a response: must be an event self.handle_event(p) + class MyoRaw(object): '''Implements the Myo-specific communication protocol.''' @@ -191,6 +204,7 @@ def __init__(self, tty=None): self.imu_handlers = [] self.arm_handlers = [] self.pose_handlers = [] + self.battery_handlers = [] def detect_tty(self): for p in comports(): @@ -204,13 +218,13 @@ def run(self, timeout=None): self.bt.recv_packet(timeout) def connect(self): - ## stop everything from before + # stop everything from before self.bt.end_scan() self.bt.disconnect(0) self.bt.disconnect(1) self.bt.disconnect(2) - ## start scanning + # start scanning print('scanning...') self.bt.discover() while True: @@ -222,12 +236,12 @@ def connect(self): break self.bt.end_scan() - ## connect and wait for status event + # connect and wait for status event conn_pkt = self.bt.connect(addr) self.conn = multiord(conn_pkt.payload)[-1] self.bt.wait_event(3, 0) - ## get firmware version + # get firmware version fw = self.read_attr(0x17) _, _, _, _, v0, v1, v2, v3 = unpack('BHBBHHHH', fw.payload) print('firmware version: %d.%d.%d.%d' % (v0, v1, v2, v3)) @@ -235,82 +249,102 @@ def connect(self): self.old = (v0 == 0) if self.old: - ## don't know what these do; Myo Connect sends them, though we get data - ## fine without them + # don't know what these do; Myo Connect sends them, though we get data + # fine without them self.write_attr(0x19, b'\x01\x02\x00\x00') + # Subscribe for notifications from 4 EMG data channels self.write_attr(0x2f, b'\x01\x00') self.write_attr(0x2c, b'\x01\x00') self.write_attr(0x32, b'\x01\x00') self.write_attr(0x35, b'\x01\x00') - ## enable EMG data + # enable EMG data self.write_attr(0x28, b'\x01\x00') - ## enable IMU data + # enable IMU data self.write_attr(0x1d, b'\x01\x00') - ## Sampling rate of the underlying EMG sensor, capped to 1000. If it's - ## less than 1000, emg_hz is correct. If it is greater, the actual - ## framerate starts dropping inversely. Also, if this is much less than - ## 1000, EMG data becomes slower to respond to changes. In conclusion, - ## 1000 is probably a good value. + # Sampling rate of the underlying EMG sensor, capped to 1000. If it's + # less than 1000, emg_hz is correct. If it is greater, the actual + # framerate starts dropping inversely. Also, if this is much less than + # 1000, EMG data becomes slower to respond to changes. In conclusion, + # 1000 is probably a good value. C = 1000 emg_hz = 50 - ## strength of low-pass filtering of EMG data + # strength of low-pass filtering of EMG data emg_smooth = 100 imu_hz = 50 - ## send sensor parameters, or we don't get any data + # send sensor parameters, or we don't get any data self.write_attr(0x19, pack('BBBBHBBBBB', 2, 9, 2, 1, C, emg_smooth, C // emg_hz, imu_hz, 0, 0)) else: name = self.read_attr(0x03) print('device name: %s' % name.payload) - ## enable IMU data + # enable IMU data self.write_attr(0x1d, b'\x01\x00') - ## enable on/off arm notifications + # enable on/off arm notifications self.write_attr(0x24, b'\x02\x00') - - # self.write_attr(0x19, b'\x01\x03\x00\x01\x01') + # enable EMG notifications self.start_raw() + # enable battery notifications + self.write_attr(0x12, b'\x01\x10') - ## add data handlers + # add data handlers def handle_data(p): - if (p.cls, p.cmd) != (4, 5): return + if (p.cls, p.cmd) != (4, 5): + return c, attr, typ = unpack('BHB', p.payload[:4]) pay = p.payload[5:] if attr == 0x27: + # Unpack a 17 byte array, first 16 are 8 unsigned shorts, last one an unsigned char vals = unpack('8HB', pay) - ## not entirely sure what the last byte is, but it's a bitmask that - ## seems to indicate which sensors think they're being moved around or - ## something + # not entirely sure what the last byte is, but it's a bitmask that + # seems to indicate which sensors think they're being moved around or + # something emg = vals[:8] moving = vals[8] self.on_emg(emg, moving) + # Read notification handles corresponding to the for EMG characteristics + elif attr == 0x2b or attr == 0x2e or attr == 0x31 or attr == 0x34: + '''According to http://developerblog.myo.com/myocraft-emg-in-the-bluetooth-protocol/ + each characteristic sends two secuential readings in each update, + so the received payload is split in two samples. According to the + Myo BLE specification, the data type of the EMG samples is int8_t. + ''' + emg1 = struct.unpack('<8b', pay[:8]) + emg2 = struct.unpack('<8b', pay[8:]) + self.on_emg(emg1, 0) + self.on_emg(emg2, 0) + # Read IMU characteristic handle elif attr == 0x1c: vals = unpack('10h', pay) quat = vals[:4] acc = vals[4:7] gyro = vals[7:10] self.on_imu(quat, acc, gyro) + # Read classifier characteristic handle elif attr == 0x23: - typ, val, xdir, _,_,_ = unpack('6B', pay) + typ, val, xdir, _, _, _ = unpack('6B', pay) - if typ == 1: # on arm + if typ == 1: # on arm self.on_arm(Arm(val), XDirection(xdir)) - elif typ == 2: # removed from arm + elif typ == 2: # removed from arm self.on_arm(Arm.UNKNOWN, XDirection.UNKNOWN) - elif typ == 3: # pose + elif typ == 3: # pose self.on_pose(Pose(val)) + # Read battery characteristic handle + elif attr == 0x11: + battery_level = ord(pay) + self.on_battery(battery_level) else: print('data with unknown attr: %02X %s' % (attr, p)) self.bt.add_handler(handle_data) - def write_attr(self, attr, val): if self.conn is not None: self.bt.write_attr(self.conn, attr, val) @@ -324,17 +358,54 @@ def disconnect(self): if self.conn is not None: self.bt.disconnect(self.conn) + def sleep_mode(self, mode): + self.write_attr(0x19, pack('3B', 9, 1, mode)) + def power_off(self): self.write_attr(0x19, b'\x04\x00') def start_raw(self): + + ''' To get raw EMG signals, we subscribe to the four EMG notification + characteristics by writing a 0x0100 command to the corresponding handles. + ''' + self.write_attr(0x2c, b'\x01\x00') # Suscribe to EmgData0Characteristic + self.write_attr(0x2f, b'\x01\x00') # Suscribe to EmgData1Characteristic + self.write_attr(0x32, b'\x01\x00') # Suscribe to EmgData2Characteristic + self.write_attr(0x35, b'\x01\x00') # Suscribe to EmgData3Characteristic + + '''Bytes sent to handle 0x19 (command characteristic) have the following + format: [command, payload_size, EMG mode, IMU mode, classifier mode] + According to the Myo BLE specification, the commands are: + 0x01 -> set EMG and IMU + 0x03 -> 3 bytes of payload + 0x02 -> send 50Hz filtered signals + 0x01 -> send IMU data streams + 0x01 -> send classifier events + ''' + self.write_attr(0x19, b'\x01\x03\x02\x01\x01') + '''Sending this sequence for v1.0 firmware seems to enable both raw data and pose notifications. ''' - self.write_attr(0x28, b'\x01\x00') - #self.write_attr(0x19, b'\x01\x03\x01\x01\x00') - self.write_attr(0x19, b'\x01\x03\x01\x01\x01') + '''By writting a 0x0100 command to handle 0x28, some kind of "hidden" EMG + notification characteristic is activated. This characteristic is not + listed on the Myo services of the offical BLE specification from Thalmic + Labs. Also, in the second line where we tell the Myo to enable EMG and + IMU data streams and classifier events, the 0x01 command wich corresponds + to the EMG mode is not listed on the myohw_emg_mode_t struct of the Myo + BLE specification. + These two lines, besides enabling the IMU and the classifier, enable the + transmission of a stream of low-pass filtered EMG signals from the eight + sensor pods of the Myo armband (the "hidden" mode I mentioned above). + Instead of getting the raw EMG signals, we get rectified and smoothed + signals, a measure of the amplitude of the EMG (which is useful to have + a measure of muscle strength, but are not as useful as a truly raw signal). + ''' + + # self.write_attr(0x28, b'\x01\x00') # Not needed for raw signals + # self.write_attr(0x19, b'\x01\x03\x01\x01\x01') def mc_start_collection(self): '''Myo Connect sends this sequence (or a reordering) when starting data @@ -342,18 +413,18 @@ def mc_start_collection(self): pose notifications. ''' - self.write_attr(0x28, b'\x01\x00') - self.write_attr(0x1d, b'\x01\x00') - self.write_attr(0x24, b'\x02\x00') - self.write_attr(0x19, b'\x01\x03\x01\x01\x01') - self.write_attr(0x28, b'\x01\x00') - self.write_attr(0x1d, b'\x01\x00') - self.write_attr(0x19, b'\x09\x01\x01\x00\x00') - self.write_attr(0x1d, b'\x01\x00') - self.write_attr(0x19, b'\x01\x03\x00\x01\x00') - self.write_attr(0x28, b'\x01\x00') - self.write_attr(0x1d, b'\x01\x00') - self.write_attr(0x19, b'\x01\x03\x01\x01\x00') + self.write_attr(0x28, b'\x01\x00') # Suscribe to EMG notifications + self.write_attr(0x1d, b'\x01\x00') # Suscribe to IMU notifications + self.write_attr(0x24, b'\x02\x00') # Suscribe to classifier indications + self.write_attr(0x19, b'\x01\x03\x01\x01\x01') # Set EMG and IMU, payload size = 3, EMG on, IMU on, classifier on + self.write_attr(0x28, b'\x01\x00') # Suscribe to EMG notifications + self.write_attr(0x1d, b'\x01\x00') # Suscribe to IMU notifications + self.write_attr(0x19, b'\x09\x01\x01\x00\x00') # Set sleep mode, payload size = 1, never go to sleep, don't know, don't know + self.write_attr(0x1d, b'\x01\x00') # Suscribe to IMU notifications + self.write_attr(0x19, b'\x01\x03\x00\x01\x00') # Set EMG and IMU, payload size = 3, EMG off, IMU on, classifier off + self.write_attr(0x28, b'\x01\x00') # Suscribe to EMG notifications + self.write_attr(0x1d, b'\x01\x00') # Suscribe to IMU notifications + self.write_attr(0x19, b'\x01\x03\x01\x01\x00') # Set EMG and IMU, payload size = 3, EMG on, IMU on, classifier off def mc_end_collection(self): '''Myo Connect sends this sequence (or a reordering) when ending data collection @@ -376,9 +447,16 @@ def mc_end_collection(self): def vibrate(self, length): if length in xrange(1, 4): - ## first byte tells it to vibrate; purpose of second byte is unknown + # first byte tells it to vibrate; purpose of second byte is unknown (payload size?) self.write_attr(0x19, pack('3B', 3, 1, length)) + def set_leds(self, logo, line): + self.write_attr(0x19, pack('8B', 6, 6, *(logo + line))) + + # def get_battery_level(self): + # battery_level = self.read_attr(0x11) + # return ord(battery_level.payload[5]) + def add_emg_handler(self, h): self.emg_handlers.append(h) @@ -391,6 +469,9 @@ def add_pose_handler(self, h): def add_arm_handler(self, h): self.arm_handlers.append(h) + def add_battery_handler(self, h): + self.battery_handlers.append(h) + def on_emg(self, emg, moving): for h in self.emg_handlers: h(emg, moving) @@ -407,6 +488,10 @@ def on_arm(self, arm, xdir): for h in self.arm_handlers: h(arm, xdir) + def on_battery(self, battery_level): + for h in self.battery_handlers: + h(battery_level) + if __name__ == '__main__': try: import pygame @@ -420,8 +505,9 @@ def on_arm(self, arm, xdir): scr = pygame.display.set_mode((w, h)) last_vals = None + def plot(scr, vals): - DRAW_LINES = False + DRAW_LINES = True global last_vals if last_vals is None: @@ -430,18 +516,18 @@ def plot(scr, vals): D = 5 scr.scroll(-D) - scr.fill((0,0,0), (w - D, 0, w, h)) + scr.fill((0, 0, 0), (w - D, 0, w, h)) for i, (u, v) in enumerate(zip(last_vals, vals)): if DRAW_LINES: - pygame.draw.line(scr, (0,255,0), - (w - D, int(h/8 * (i+1 - u))), - (w, int(h/8 * (i+1 - v)))) - pygame.draw.line(scr, (255,255,255), - (w - D, int(h/8 * (i+1))), - (w, int(h/8 * (i+1)))) + pygame.draw.line(scr, (0, 255, 0), + (w - D, int(h/9 * (i+1 - u))), + (w, int(h/9 * (i+1 - v)))) + pygame.draw.line(scr, (255, 255, 255), + (w - D, int(h/9 * (i+1))), + (w, int(h/9 * (i+1)))) else: c = int(255 * max(0, min(1, v))) - scr.fill((c, c, c), (w - D, i * h / 8, D, (i + 1) * h / 8 - i * h / 8)); + scr.fill((c, c, c), (w - D, i * h / 8, D, (i + 1) * h / 8 - i * h / 8)) pygame.display.flip() last_vals = vals @@ -450,22 +536,34 @@ def plot(scr, vals): def proc_emg(emg, moving, times=[]): if HAVE_PYGAME: - ## update pygame display - plot(scr, [e / 2000. for e in emg]) + # update pygame display + plot(scr, [e / 500. for e in emg]) else: print(emg) - ## print framerate of received data + # print framerate of received data times.append(time.time()) if len(times) > 20: - #print((len(times) - 1) / (times[-1] - times[0])) + # print((len(times) - 1) / (times[-1] - times[0])) times.pop(0) + def proc_battery(battery_level): + print("Battery level: %d" % battery_level) + if battery_level < 5: + m.set_leds([255, 0, 0], [255, 0, 0]) + else: + m.set_leds([128, 128, 255], [128, 128, 255]) + m.add_emg_handler(proc_emg) + m.add_battery_handler(proc_battery) m.connect() m.add_arm_handler(lambda arm, xdir: print('arm', arm, 'xdir', xdir)) m.add_pose_handler(lambda p: print('pose', p)) + # m.add_imu_handler(lambda quat, acc, gyro: print('quaternion', quat)) + m.sleep_mode(1) + m.set_leds([128, 128, 255], [128, 128, 255]) # purple logo and bar LEDs + m.vibrate(1) try: while True: @@ -475,6 +573,10 @@ def proc_emg(emg, moving, times=[]): for ev in pygame.event.get(): if ev.type == QUIT or (ev.type == KEYDOWN and ev.unicode == 'q'): raise KeyboardInterrupt() + # elif ev.type == KEYDOWN and ev.unicode == 'd': + # m.disconnect() + # print("Disconnected") + # raise KeyboardInterrupt() elif ev.type == KEYDOWN: if K_1 <= ev.key <= K_3: m.vibrate(ev.key - K_0) @@ -484,7 +586,14 @@ def proc_emg(emg, moving, times=[]): except KeyboardInterrupt: pass finally: - m.power_off() - print("Power off") - # m.disconnect() - # print("Disconnected") + # m.power_off() + # print("Power off") + m.disconnect() + print("Disconnected") + # command = raw_input("Do you want to (d)isconnect or (p)ower off?\n") + # if command == 'd': + # m.disconnect() + # print("Disconnected") + # elif command == 'p': + # m.power_off() + # print("Power off")