diff --git a/adafruit_minimqtt/adafruit_minimqtt.py b/adafruit_minimqtt/adafruit_minimqtt.py index 2d74dd92..90d3a74c 100644 --- a/adafruit_minimqtt/adafruit_minimqtt.py +++ b/adafruit_minimqtt/adafruit_minimqtt.py @@ -784,22 +784,34 @@ def subscribe(self, topic, qos=0): stamp = time.monotonic() while True: op = self._wait_for_msg() - if op == 0x90: - rc = self._sock_exact_recv(4) - assert rc[1] == packet[2] and rc[2] == packet[3] - if rc[3] == 0x80: - raise MMQTTException("SUBACK Failure!") - for t, q in topics: - if self.on_subscribe is not None: - self.on_subscribe(self, self._user_data, t, q) - self._subscribed_topics.append(t) - return - if op is None: if time.monotonic() - stamp > self._recv_timeout: raise MMQTTException( f"No data received from broker for {self._recv_timeout} seconds." ) + else: + if op == 0x90: + rc = self._sock_exact_recv(3) + # Check packet identifier. + assert rc[1] == packet[2] and rc[2] == packet[3] + remaining_len = rc[0] - 2 + assert remaining_len > 0 + rc = self._sock_exact_recv(remaining_len) + for i in range(0, remaining_len): + if rc[i] not in [0, 1, 2]: + raise MMQTTException( + f"SUBACK Failure for topic {topics[i][0]}: {hex(rc[i])}" + ) + + for t, q in topics: + if self.on_subscribe is not None: + self.on_subscribe(self, self._user_data, t, q) + self._subscribed_topics.append(t) + return + + raise MMQTTException( + f"invalid message received as response to SUBSCRIBE: {hex(op)}" + ) def unsubscribe(self, topic): """Unsubscribes from a MQTT topic. @@ -838,22 +850,26 @@ def unsubscribe(self, topic): while True: stamp = time.monotonic() op = self._wait_for_msg() - if op == 176: - rc = self._sock_exact_recv(3) - assert rc[0] == 0x02 - # [MQTT-3.32] - assert rc[1] == packet_id_bytes[0] and rc[2] == packet_id_bytes[1] - for t in topics: - if self.on_unsubscribe is not None: - self.on_unsubscribe(self, self._user_data, t, self._pid) - self._subscribed_topics.remove(t) - return - if op is None: if time.monotonic() - stamp > self._recv_timeout: raise MMQTTException( f"No data received from broker for {self._recv_timeout} seconds." ) + else: + if op == 176: + rc = self._sock_exact_recv(3) + assert rc[0] == 0x02 + # [MQTT-3.32] + assert rc[1] == packet_id_bytes[0] and rc[2] == packet_id_bytes[1] + for t in topics: + if self.on_unsubscribe is not None: + self.on_unsubscribe(self, self._user_data, t, self._pid) + self._subscribed_topics.remove(t) + return + + raise MMQTTException( + f"invalid message received as response to UNSUBSCRIBE: {hex(op)}" + ) def _recompute_reconnect_backoff(self): """ @@ -992,6 +1008,7 @@ def _wait_for_msg(self, timeout=0.1): return MQTT_PINGRESP if res[0] & MQTT_PKT_TYPE_MASK != MQTT_PUBLISH: + self.logger.debug(f"Got message type: {hex(res[0])}") return res[0] # Handle only the PUBLISH packet type from now on.