Skip to content

Commit

Permalink
fixing type hints errors on python controller code (#34415)
Browse files Browse the repository at this point in the history
* fixing mypy errors

* restyled

* add new files to BUILD.gn

* mypy: fixing p256keypair.py

* more mypy fixes

* adding workaround to allow type hinting ctypes pointers

* more mypy fixes

* more mypy fixes

* Restyled by autopep8

---------

Co-authored-by: Restyled.io <commits@restyled.io>
  • Loading branch information
2 people authored and pull[bot] committed Nov 7, 2024
1 parent 5e484c2 commit 7f27d94
Show file tree
Hide file tree
Showing 13 changed files with 172 additions and 169 deletions.
2 changes: 2 additions & 0 deletions src/controller/python/BUILD.gn
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,9 @@ chip_python_wheel_action("chip-core") {
"chip/commissioning/commissioning_flow_blocks.py",
"chip/commissioning/pase.py",
"chip/configuration/__init__.py",
"chip/credentials/__init__.py",
"chip/credentials/cert.py",
"chip/crypto/__init__.py",
"chip/crypto/fabric.py",
"chip/crypto/p256keypair.py",
"chip/discovery/__init__.py",
Expand Down
92 changes: 45 additions & 47 deletions src/controller/python/chip/ChipDeviceCtrl.py

Large diffs are not rendered by default.

142 changes: 65 additions & 77 deletions src/controller/python/chip/clusters/Attribute.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,16 @@
from ctypes import CFUNCTYPE, POINTER, c_size_t, c_uint8, c_uint16, c_uint32, c_uint64, c_void_p, cast, py_object
from dataclasses import dataclass, field
from enum import Enum, unique
from typing import Any, Callable, Dict, List, Optional, Tuple, Union
from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union

import chip
import chip.exceptions
import chip.interaction_model
import chip.tlv
import construct
import construct # type: ignore
from chip.interaction_model import PyWriteAttributeData
from chip.native import ErrorSDKPart, PyChipError
from rich.pretty import pprint
from rich.pretty import pprint # type: ignore

from .ClusterObjects import Cluster, ClusterAttributeDescriptor, ClusterEvent

Expand All @@ -58,9 +58,9 @@ class EventPriority(Enum):

@dataclass(frozen=True)
class AttributePath:
EndpointId: int = None
ClusterId: int = None
AttributeId: int = None
EndpointId: Optional[int] = None
ClusterId: Optional[int] = None
AttributeId: Optional[int] = None

@staticmethod
def from_cluster(EndpointId: int, Cluster: Cluster) -> AttributePath:
Expand All @@ -80,12 +80,12 @@ def __str__(self) -> str:

@dataclass(frozen=True)
class DataVersionFilter:
EndpointId: int = None
ClusterId: int = None
DataVersion: int = None
EndpointId: Optional[int] = None
ClusterId: Optional[int] = None
DataVersion: Optional[int] = None

@staticmethod
def from_cluster(EndpointId: int, Cluster: Cluster, DataVersion: int = None) -> AttributePath:
def from_cluster(EndpointId: int, Cluster: Cluster, DataVersion: int) -> DataVersionFilter:
if Cluster is None:
raise ValueError("Cluster cannot be None")
return DataVersionFilter(EndpointId=EndpointId, ClusterId=Cluster.id, DataVersion=DataVersion)
Expand All @@ -99,70 +99,67 @@ class TypedAttributePath:
''' Encapsulates an attribute path that has strongly typed references to cluster and attribute
cluster object types. These types serve as keys into the attribute cache.
'''
ClusterType: Cluster = None
AttributeType: ClusterAttributeDescriptor = None
AttributeName: str = None
Path: AttributePath = None

def __init__(self, ClusterType: Cluster = None, AttributeType: ClusterAttributeDescriptor = None,
Path: AttributePath = None):
''' Only one of either ClusterType and AttributeType OR Path may be provided.
'''

#
# First, let's populate ClusterType and AttributeType. If it's already provided,
# we can continue onwards to deriving the label. Otherwise, we'll need to
# walk the attribute index to find the right type information.
#
if (ClusterType is not None and AttributeType is not None):
self.ClusterType = ClusterType
self.AttributeType = AttributeType
else:
if (Path is None):
raise ValueError("Path should have a valid value")

ClusterType: Optional[Cluster] = None
AttributeType: Optional[ClusterAttributeDescriptor] = None
AttributeName: Optional[str] = None
Path: Optional[AttributePath] = None
ClusterId: Optional[int] = None
AttributeId: Optional[int] = None

def __post_init__(self):
'''Only one of either ClusterType and AttributeType OR Path may be provided.'''

if (self.ClusterType is not None and self.AttributeType is not None) and self.Path is not None:
raise ValueError("Only one of either ClusterType and AttributeType OR Path may be provided.")
if (self.ClusterType is None or self.AttributeType is None) and self.Path is None:
raise ValueError("Either ClusterType and AttributeType OR Path must be provided.")

# if ClusterType and AttributeType were provided we can continue onwards to deriving the label.
# Otherwise, we'll need to walk the attribute index to find the right type information.

# If Path is provided, derive ClusterType and AttributeType from it
if self.Path is not None:
for cluster, attribute in _AttributeIndex:
attributeType = _AttributeIndex[(cluster, attribute)][0]
clusterType = _AttributeIndex[(cluster, attribute)][1]

if (clusterType.id == Path.ClusterId and attributeType.attribute_id == Path.AttributeId):
if clusterType.id == self.Path.ClusterId and attributeType.attribute_id == self.Path.AttributeId:
self.ClusterType = clusterType
self.AttributeType = attributeType
break

if (self.ClusterType is None or self.AttributeType is None):
raise KeyError(f"No Schema found for Attribute {Path}")
if self.ClusterType is None or self.AttributeType is None:
raise KeyError(f"No Schema found for Attribute {self.Path}")

# Next, let's figure out the label.
for c_field in self.ClusterType.descriptor.Fields:
if (c_field.Tag != self.AttributeType.attribute_id):
if c_field.Tag != self.AttributeType.attribute_id:
continue

self.AttributeName = c_field.Label

if (self.AttributeName is None):
raise KeyError(f"Unable to resolve name for Attribute {Path}")
if self.AttributeName is None:
raise KeyError(f"Unable to resolve name for Attribute {self.Path}")

self.Path = Path
self.ClusterId = self.ClusterType.id
self.AttributeId = self.AttributeType.attribute_id


@dataclass(frozen=True)
class EventPath:
EndpointId: int = None
ClusterId: int = None
EventId: int = None
Urgent: int = None
EndpointId: Optional[int] = None
ClusterId: Optional[int] = None
EventId: Optional[int] = None
Urgent: Optional[int] = None

@staticmethod
def from_cluster(EndpointId: int, Cluster: Cluster, EventId: int = None, Urgent: int = None) -> "EventPath":
def from_cluster(EndpointId: int, Cluster: Cluster, EventId: Optional[int] = None, Urgent: Optional[int] = None) -> "EventPath":
if Cluster is None:
raise ValueError("Cluster cannot be None")
return EventPath(EndpointId=EndpointId, ClusterId=Cluster.id, EventId=EventId, Urgent=Urgent)

@staticmethod
def from_event(EndpointId: int, Event: ClusterEvent, Urgent: int = None) -> "EventPath":
def from_event(EndpointId: int, Event: ClusterEvent, Urgent: Optional[int] = None) -> "EventPath":
if Event is None:
raise ValueError("Event cannot be None")
return EventPath(EndpointId=EndpointId, ClusterId=Event.cluster_id, EventId=Event.event_id, Urgent=Urgent)
Expand All @@ -173,23 +170,13 @@ def __str__(self) -> str:

@dataclass
class EventHeader:
EndpointId: int = None
ClusterId: int = None
EventId: int = None
EventNumber: int = None
Priority: EventPriority = None
Timestamp: int = None
TimestampType: EventTimestampType = None

def __init__(self, EndpointId: int = None, ClusterId: int = None,
EventId: int = None, EventNumber=None, Priority=None, Timestamp=None, TimestampType=None):
self.EndpointId = EndpointId
self.ClusterId = ClusterId
self.EventId = EventId
self.EventNumber = EventNumber
self.Priority = Priority
self.Timestamp = Timestamp
self.TimestampType = TimestampType
EndpointId: Optional[int] = None
ClusterId: Optional[int] = None
EventId: Optional[int] = None
EventNumber: Optional[int] = None
Priority: Optional[EventPriority] = None
Timestamp: Optional[int] = None
TimestampType: Optional[EventTimestampType] = None

def __str__(self) -> str:
return (f"{self.EndpointId}/{self.ClusterId}/{self.EventId}/"
Expand Down Expand Up @@ -247,7 +234,7 @@ class ValueDecodeFailure:
'''

TLVValue: Any = None
Reason: Exception = None
Reason: Optional[Exception] = None


@dataclass
Expand Down Expand Up @@ -431,15 +418,16 @@ def handle_attribute_view(endpointId, clusterId, attributeId, attributeType):

class SubscriptionTransaction:
def __init__(self, transaction: AsyncReadTransaction, subscriptionId, devCtrl):
self._onResubscriptionAttemptedCb = DefaultResubscriptionAttemptedCallback
self._onAttributeChangeCb = DefaultAttributeChangeCallback
self._onEventChangeCb = DefaultEventChangeCallback
self._onErrorCb = DefaultErrorCallback
self._onResubscriptionAttemptedCb: Callable[[SubscriptionTransaction,
int, int], None] = DefaultResubscriptionAttemptedCallback
self._onAttributeChangeCb: Callable[[TypedAttributePath, SubscriptionTransaction], None] = DefaultAttributeChangeCallback
self._onEventChangeCb: Callable[[EventReadResult, SubscriptionTransaction], None] = DefaultEventChangeCallback
self._onErrorCb: Callable[[int, SubscriptionTransaction], None] = DefaultErrorCallback
self._readTransaction = transaction
self._subscriptionId = subscriptionId
self._devCtrl = devCtrl
self._isDone = False
self._onResubscriptionSucceededCb = None
self._onResubscriptionSucceededCb: Optional[Callable[[SubscriptionTransaction], None]] = None
self._onResubscriptionSucceededCb_isAsync = False
self._onResubscriptionAttemptedCb_isAsync = False

Expand Down Expand Up @@ -647,10 +635,10 @@ def __init__(self, future: Future, eventLoop, devCtrl, returnClusterObject: bool
self._event_loop = eventLoop
self._future = future
self._subscription_handler = None
self._events = []
self._events: List[EventReadResult] = []
self._devCtrl = devCtrl
self._cache = AttributeCache(returnClusterObject=returnClusterObject)
self._changedPathSet = set()
self._changedPathSet: Set[AttributePath] = set()
self._pReadClient = None
self._pReadCallback = None
self._resultError: Optional[PyChipError] = None
Expand Down Expand Up @@ -809,7 +797,7 @@ class AsyncWriteTransaction:
def __init__(self, future: Future, eventLoop):
self._event_loop = eventLoop
self._future = future
self._resultData = []
self._resultData: List[AttributeWriteResult] = []
self._resultError: Optional[PyChipError] = None

def handleResponse(self, path: AttributePath, status: int):
Expand Down Expand Up @@ -1014,9 +1002,9 @@ def WriteGroupAttributes(groupId: int, devCtrl: c_void_p, attributes: List[Attri


def Read(future: Future, eventLoop, device, devCtrl,
attributes: List[AttributePath] = None, dataVersionFilters: List[DataVersionFilter] = None,
events: List[EventPath] = None, eventNumberFilter: Optional[int] = None, returnClusterObject: bool = True,
subscriptionParameters: SubscriptionParameters = None,
attributes: Optional[List[AttributePath]] = None, dataVersionFilters: Optional[List[DataVersionFilter]] = None,
events: Optional[List[EventPath]] = None, eventNumberFilter: Optional[int] = None, returnClusterObject: bool = True,
subscriptionParameters: Optional[SubscriptionParameters] = None,
fabricFiltered: bool = True, keepSubscriptions: bool = False, autoResubscribe: bool = True) -> PyChipError:
if (not attributes) and dataVersionFilters:
raise ValueError(
Expand Down Expand Up @@ -1132,9 +1120,9 @@ def Read(future: Future, eventLoop, device, devCtrl,


def ReadAttributes(future: Future, eventLoop, device, devCtrl,
attributes: List[AttributePath], dataVersionFilters: List[DataVersionFilter] = None,
attributes: List[AttributePath], dataVersionFilters: Optional[List[DataVersionFilter]] = None,
returnClusterObject: bool = True,
subscriptionParameters: SubscriptionParameters = None, fabricFiltered: bool = True) -> int:
subscriptionParameters: Optional[SubscriptionParameters] = None, fabricFiltered: bool = True) -> int:
return Read(future=future, eventLoop=eventLoop, device=device,
devCtrl=devCtrl, attributes=attributes, dataVersionFilters=dataVersionFilters,
events=None, returnClusterObject=returnClusterObject,
Expand All @@ -1143,7 +1131,7 @@ def ReadAttributes(future: Future, eventLoop, device, devCtrl,

def ReadEvents(future: Future, eventLoop, device, devCtrl,
events: List[EventPath], eventNumberFilter=None, returnClusterObject: bool = True,
subscriptionParameters: SubscriptionParameters = None, fabricFiltered: bool = True) -> int:
subscriptionParameters: Optional[SubscriptionParameters] = None, fabricFiltered: bool = True) -> int:
return Read(future=future, eventLoop=eventLoop, device=device, devCtrl=devCtrl, attributes=None,
dataVersionFilters=None, events=events, eventNumberFilter=eventNumberFilter,
returnClusterObject=returnClusterObject,
Expand Down
2 changes: 1 addition & 1 deletion src/controller/python/chip/clusters/CHIPClusters.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 12 additions & 12 deletions src/controller/python/chip/clusters/ClusterObjects.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

from chip import ChipUtility, tlv
from chip.clusters.Types import Nullable, NullValue
from dacite import from_dict
from dacite import from_dict # type: ignore


def GetUnionUnderlyingType(typeToCheck, matchingType=None):
Expand Down Expand Up @@ -52,8 +52,8 @@ def GetUnionUnderlyingType(typeToCheck, matchingType=None):
@dataclass
class ClusterObjectFieldDescriptor:
Label: str = ''
Tag: int = None
Type: type = None
Tag: typing.Optional[int] = None
Type: type = type(None)

def _PutSingleElementToTLV(self, tag, val, elementType, writer: tlv.TLVWriter, debugPath: str = '?'):
if issubclass(elementType, ClusterObject):
Expand Down Expand Up @@ -113,13 +113,13 @@ def PutFieldToTLV(self, tag, val, writer: tlv.TLVWriter, debugPath: str = '?'):
class ClusterObjectDescriptor:
Fields: List[ClusterObjectFieldDescriptor]

def GetFieldByTag(self, tag: int) -> ClusterObjectFieldDescriptor:
def GetFieldByTag(self, tag: int) -> typing.Optional[ClusterObjectFieldDescriptor]:
for _field in self.Fields:
if _field.Tag == tag:
return _field
return None

def GetFieldByLabel(self, label: str) -> ClusterObjectFieldDescriptor:
def GetFieldByLabel(self, label: str) -> typing.Optional[ClusterObjectFieldDescriptor]:
for _field in self.Fields:
if _field.Label == label:
return _field
Expand All @@ -140,7 +140,7 @@ def _ConvertNonArray(self, debugPath: str, elementType, value: Any) -> Any:
return elementType.descriptor.TagDictToLabelDict(debugPath, value)

def TagDictToLabelDict(self, debugPath: str, tlvData: Dict[int, Any]) -> Dict[str, Any]:
ret = {}
ret: typing.Dict[Any, Any] = {}
for tag, value in tlvData.items():
descriptor = self.GetFieldByTag(tag)
if not descriptor:
Expand All @@ -156,7 +156,7 @@ def TagDictToLabelDict(self, debugPath: str, tlvData: Dict[int, Any]) -> Dict[st
realType = GetUnionUnderlyingType(descriptor.Type)
if (realType is None):
raise ValueError(
f"Field {debugPath}.{self.Label} has no valid underlying data model type")
f"Field {debugPath}.{descriptor.Label} has no valid underlying data model type")

valueType = realType
else:
Expand All @@ -175,7 +175,7 @@ def TagDictToLabelDict(self, debugPath: str, tlvData: Dict[int, Any]) -> Dict[st

def TLVToDict(self, tlvBuf: bytes) -> Dict[str, Any]:
tlvData = tlv.TLVReader(tlvBuf).get().get('Any', {})
return self.TagDictToLabelDict([], tlvData)
return self.TagDictToLabelDict('', tlvData)

def DictToTLVWithWriter(self, debugPath: str, tag, data: Mapping, writer: tlv.TLVWriter):
writer.startStructure(tag)
Expand Down Expand Up @@ -209,11 +209,11 @@ def descriptor(cls):

# The below dictionaries will be filled dynamically
# and are used for quick lookup/mapping from cluster/attribute id to the correct class
ALL_CLUSTERS = {}
ALL_ATTRIBUTES = {}
ALL_CLUSTERS: typing.Dict = {}
ALL_ATTRIBUTES: typing.Dict = {}
# These need to be separate because there can be overlap in command ids for commands and responses.
ALL_ACCEPTED_COMMANDS = {}
ALL_GENERATED_COMMANDS = {}
ALL_ACCEPTED_COMMANDS: typing.Dict = {}
ALL_GENERATED_COMMANDS: typing.Dict = {}


class ClusterCommand(ClusterObject):
Expand Down
2 changes: 1 addition & 1 deletion src/controller/python/chip/clusters/enum.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

from threading import Lock

from aenum import IntEnum, extend_enum
from aenum import IntEnum, extend_enum # type: ignore

# Flag on whether we should map unknown enum values to kUnknownEnumValue.
_map_missing_enum_to_unknown_enum_value = True
Expand Down
Empty file.
Empty file.
Loading

0 comments on commit 7f27d94

Please sign in to comment.