Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Python event-loop & object lifetimes (for reads/writes) #19832

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 47 additions & 38 deletions src/controller/python/chip/clusters/Attribute.py
Original file line number Diff line number Diff line change
Expand Up @@ -600,6 +600,7 @@ def __init__(self, future: Future, eventLoop, devCtrl, returnClusterObject: bool
self._changedPathSet = set()
self._pReadClient = None
self._pReadCallback = None
self._resultError = None

def SetClientObjPointers(self, pReadClient, pReadCallback):
self._pReadClient = pReadClient
Expand All @@ -608,7 +609,7 @@ def SetClientObjPointers(self, pReadClient, pReadCallback):
def GetAllEventValues(self):
return self._events

def _handleAttributeData(self, path: AttributePathWithListIndex, dataVersion: int, status: int, data: bytes):
def handleAttributeData(self, path: AttributePathWithListIndex, dataVersion: int, status: int, data: bytes):
try:
imStatus = status
try:
Expand All @@ -629,10 +630,7 @@ def _handleAttributeData(self, path: AttributePathWithListIndex, dataVersion: in
except Exception as ex:
logging.exception(ex)

def handleAttributeData(self, path: AttributePath, dataVersion: int, status: int, data: bytes):
self._handleAttributeData(path, dataVersion, status, data)

def _handleEventData(self, header: EventHeader, path: EventPath, data: bytes, status: int):
def handleEventData(self, header: EventHeader, path: EventPath, data: bytes, status: int):
try:
eventType = _EventIndex.get(str(path), None)
eventValue = None
Expand Down Expand Up @@ -671,19 +669,8 @@ def _handleEventData(self, header: EventHeader, path: EventPath, data: bytes, st
except Exception as ex:
logging.exception(ex)

def handleEventData(self, header: EventHeader, path: EventPath, data: bytes, status: int):
self._handleEventData(header, path, data, status)

def _handleError(self, chipError: int):
if not self._future.done():
self._future.set_exception(
chip.exceptions.ChipStackError(chipError))
self._subscription_handler.OnErrorCb(chipError, self._subscription_handler)

def handleError(self, chipError: int):
self._event_loop.call_soon_threadsafe(
self._handleError, chipError
)
self._resultError = chipError

def _handleSubscriptionEstablished(self, subscriptionId):
if not self._future.done():
Expand Down Expand Up @@ -713,9 +700,28 @@ def _handleReportEnd(self):
self._changedPathSet = set()

def _handleDone(self):
#
# We only set the exception/result on the future in this _handleDone call (if it hasn't
# already been set yet, which can be in the case of subscriptions) since doing so earlier
# would result in the callers awaiting the result to
# move on, possibly invalidating the provided _event_loop.
#
if not self._future.done():
self._future.set_result(AsyncReadTransaction.ReadResponse(
attributes=self._cache.attributeCache, events=self._events))
if self._resultError:
if self._subscription_handler:
self._subscription_handler.OnErrorCb(chipError, self._subscription_handler)
else:
self._future.set_exception(chip.exceptions.ChipStackError(chipError))
else:
self._future.set_result(AsyncReadTransaction.ReadResponse(
attributes=self._cache.attributeCache, events=self._events))

#
# Decrement the ref on ourselves to match the increment that happened at allocation.
# This happens synchronously as part of handling done to ensure the object remains valid
# right till the very end.
#
ctypes.pythonapi.Py_DecRef(ctypes.py_object(self))

def handleDone(self):
self._event_loop.call_soon_threadsafe(self._handleDone)
Expand All @@ -732,31 +738,36 @@ class AsyncWriteTransaction:
def __init__(self, future: Future, eventLoop):
self._event_loop = eventLoop
self._future = future
self._res = []
self._resultData = []
self._resultError = None

def _handleResponse(self, path: AttributePath, status: int):
def handleResponse(self, path: AttributePath, status: int):
try:
imStatus = chip.interaction_model.Status(status)
self._res.append(AttributeWriteResult(Path=path, Status=imStatus))
self._resultData.append(AttributeWriteResult(Path=path, Status=imStatus))
except:
self._res.append(AttributeWriteResult(Path=path, Status=status))

def handleResponse(self, path: AttributePath, status: int):
self._event_loop.call_soon_threadsafe(
self._handleResponse, path, status)

def _handleError(self, chipError: int):
self._future.set_exception(
chip.exceptions.ChipStackError(chipError))
self._resultData.append(AttributeWriteResult(Path=path, Status=status))

def handleError(self, chipError: int):
self._event_loop.call_soon_threadsafe(
self._handleError, chipError
)
self._resultError = chipError

def _handleDone(self):
if not self._future.done():
self._future.set_result(self._res)
#
# We only set the exception/result on the future in this _handleDone call,
# since doing so earlier would result in the callers awaiting the result to
# move on, possibly invalidating the provided _event_loop.
#
if self._resultError is not None:
self._future.set_exception(chip.exceptions.ChipStackError(self._resultError))
else:
self._future.set_result(self._resultData)

#
# Decrement the ref on ourselves to match the increment that happened at allocation.
# This happens synchronously as part of handling done to ensure the object remains valid
# right till the very end.
#
ctypes.pythonapi.Py_DecRef(ctypes.py_object(self))

def handleDone(self):
self._event_loop.call_soon_threadsafe(self._handleDone)
Expand Down Expand Up @@ -821,7 +832,6 @@ def _OnReportEndCallback(closure):
@_OnReadDoneCallbackFunct
def _OnReadDoneCallback(closure):
closure.handleDone()
ctypes.pythonapi.Py_DecRef(ctypes.py_object(closure))


_OnWriteResponseCallbackFunct = CFUNCTYPE(
Expand All @@ -846,7 +856,6 @@ def _OnWriteErrorCallback(closure, chiperror: int):
@_OnWriteDoneCallbackFunct
def _OnWriteDoneCallback(closure):
closure.handleDone()
ctypes.pythonapi.Py_DecRef(ctypes.py_object(closure))


def WriteAttributes(future: Future, eventLoop, device, attributes: List[AttributeWriteRequest], timedRequestTimeoutMs: int = None) -> int:
Expand Down