Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Python] Avoid RuntimeException if APIs with future raise an error #34354

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 23 additions & 30 deletions src/controller/python/chip/ChipDeviceCtrl.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,10 @@ def future(self) -> typing.Optional[concurrent.futures.Future]:

async def __aexit__(self, exc_type, exc_value, traceback):
if not self._future.done():
raise RuntimeError("CallbackContext future not completed")
# In case the initial call (which sets up for the callback) fails,
# the future will never be used actually. So just cancel it here
# for completeness, in case somebody is expecting it to be completed.
self._future.cancel()
self._future = None
self._lock.release()

Expand Down Expand Up @@ -603,23 +606,22 @@ async def ConnectBLE(self, discriminator: int, setupPinCode: int, nodeid: int, i

async with self._commissioning_context as ctx:
self._enablePairingCompleteCallback(True)
res = await self._ChipStack.CallAsync(
await self._ChipStack.CallAsync(
lambda: self._dmLib.pychip_DeviceController_ConnectBLE(
self.devCtrl, discriminator, isShortDiscriminator, setupPinCode, nodeid)
)
res.raise_on_error()

return await asyncio.futures.wrap_future(ctx.future)

async def UnpairDevice(self, nodeid: int) -> None:
self.CheckIsActive()

async with self._unpair_device_context as ctx:
res = await self._ChipStack.CallAsync(
await self._ChipStack.CallAsync(
lambda: self._dmLib.pychip_DeviceController_UnpairDevice(
self.devCtrl, nodeid, self.cbHandleDeviceUnpairCompleteFunct)
)
res.raise_on_error()

return await asyncio.futures.wrap_future(ctx.future)

def CloseBLEConnection(self):
Expand Down Expand Up @@ -656,8 +658,7 @@ async def _establishPASESession(self, callFunct):

async with self._pase_establishment_context as ctx:
self._enablePairingCompleteCallback(True)
res = await self._ChipStack.CallAsync(callFunct)
res.raise_on_error()
await self._ChipStack.CallAsync(callFunct)
await asyncio.futures.wrap_future(ctx.future)

async def EstablishPASESessionBLE(self, setupPinCode: int, discriminator: int, nodeid: int) -> None:
Expand Down Expand Up @@ -756,13 +757,12 @@ async def DiscoverCommissionableNodes(self, filterType: discovery.FilterType = d
# Discovery is also used during commissioning. Make sure this manual discovery
# and commissioning attempts do not interfere with each other.
async with self._commissioning_lock:
res = await self._ChipStack.CallAsync(
await self._ChipStack.CallAsync(
lambda: self._dmLib.pychip_DeviceController_DiscoverCommissionableNodes(
self.devCtrl, int(filterType), str(filter).encode("utf-8")))
res.raise_on_error()

async def _wait_discovery():
while not await self._ChipStack.CallAsync(
while not await self._ChipStack.CallAsyncWithResult(
lambda: self._dmLib.pychip_DeviceController_HasDiscoveredCommissionableNode(self.devCtrl)):
await asyncio.sleep(0.1)
return
Expand All @@ -776,9 +776,8 @@ async def _wait_discovery():
# Expected timeout, do nothing
pass
finally:
res = await self._ChipStack.CallAsync(
await self._ChipStack.CallAsync(
lambda: self._dmLib.pychip_DeviceController_StopCommissionableDiscovery(self.devCtrl))
res.raise_on_error()

return await self.GetDiscoveredDevices()

Expand All @@ -796,7 +795,7 @@ def HandleDevice(deviceJson, deviceJsonLen):
self._dmLib.pychip_DeviceController_IterateDiscoveredCommissionableNodes(devCtrl.devCtrl, HandleDevice)
return devices

return await self._ChipStack.CallAsync(lambda: GetDevices(self))
return await self._ChipStack.CallAsyncWithResult(lambda: GetDevices(self))

def GetIPForDiscoveredDevice(self, idx, addrStr, length):
self.CheckIsActive()
Expand Down Expand Up @@ -828,11 +827,10 @@ async def OpenCommissioningWindow(self, nodeid: int, timeout: int, iteration: in
self.CheckIsActive()

async with self._open_window_context as ctx:
res = await self._ChipStack.CallAsync(
await self._ChipStack.CallAsync(
lambda: self._dmLib.pychip_DeviceController_OpenCommissioningWindow(
self.devCtrl, self.pairingDelegate, nodeid, timeout, iteration, discriminator, option)
)
res.raise_on_error()

return await asyncio.futures.wrap_future(ctx.future)

Expand Down Expand Up @@ -896,14 +894,14 @@ async def FindOrEstablishPASESession(self, setupCode: str, nodeid: int, timeoutM
''' Returns CommissioneeDeviceProxy if we can find or establish a PASE connection to the specified device'''
self.CheckIsActive()
returnDevice = c_void_p(None)
res = await self._ChipStack.CallAsync(lambda: self._dmLib.pychip_GetDeviceBeingCommissioned(
res = await self._ChipStack.CallAsyncWithResult(lambda: self._dmLib.pychip_GetDeviceBeingCommissioned(
self.devCtrl, nodeid, byref(returnDevice)), timeoutMs)
if res.is_success:
return DeviceProxyWrapper(returnDevice, DeviceProxyWrapper.DeviceProxyType.COMMISSIONEE, self._dmLib)

await self.EstablishPASESession(setupCode, nodeid)

res = await self._ChipStack.CallAsync(lambda: self._dmLib.pychip_GetDeviceBeingCommissioned(
res = await self._ChipStack.CallAsyncWithResult(lambda: self._dmLib.pychip_GetDeviceBeingCommissioned(
self.devCtrl, nodeid, byref(returnDevice)), timeoutMs)
if res.is_success:
return DeviceProxyWrapper(returnDevice, DeviceProxyWrapper.DeviceProxyType.COMMISSIONEE, self._dmLib)
Expand Down Expand Up @@ -991,7 +989,7 @@ async def GetConnectedDevice(self, nodeid, allowPASE: bool = True, timeoutMs: in

if allowPASE:
returnDevice = c_void_p(None)
res = await self._ChipStack.CallAsync(lambda: self._dmLib.pychip_GetDeviceBeingCommissioned(
res = await self._ChipStack.CallAsyncWithResult(lambda: self._dmLib.pychip_GetDeviceBeingCommissioned(
self.devCtrl, nodeid, byref(returnDevice)), timeoutMs)
if res.is_success:
LOGGER.info('Using PASE connection')
Expand Down Expand Up @@ -1021,10 +1019,9 @@ def deviceAvailable(self, device, err):

closure = DeviceAvailableClosure(eventLoop, future)
ctypes.pythonapi.Py_IncRef(ctypes.py_object(closure))
res = await self._ChipStack.CallAsync(lambda: self._dmLib.pychip_GetConnectedDeviceByNodeId(
await self._ChipStack.CallAsync(lambda: self._dmLib.pychip_GetConnectedDeviceByNodeId(
self.devCtrl, nodeid, ctypes.py_object(closure), _DeviceAvailableCallback),
timeoutMs)
res.raise_on_error()

# The callback might have been received synchronously (during self._ChipStack.CallAsync()).
# In that case the Future has already been set it will return immediately
Expand Down Expand Up @@ -1917,11 +1914,10 @@ async def Commission(self, nodeid) -> int:

async with self._commissioning_context as ctx:
self._enablePairingCompleteCallback(False)
res = await self._ChipStack.CallAsync(
await self._ChipStack.CallAsync(
lambda: self._dmLib.pychip_DeviceController_Commission(
self.devCtrl, nodeid)
)
res.raise_on_error()

return await asyncio.futures.wrap_future(ctx.future)

Expand Down Expand Up @@ -2065,11 +2061,10 @@ async def CommissionOnNetwork(self, nodeId: int, setupPinCode: int,

async with self._commissioning_context as ctx:
self._enablePairingCompleteCallback(True)
res = await self._ChipStack.CallAsync(
await self._ChipStack.CallAsync(
lambda: self._dmLib.pychip_DeviceController_OnNetworkCommission(
self.devCtrl, self.pairingDelegate, nodeId, setupPinCode, int(filterType), str(filter).encode("utf-8") if filter is not None else None, discoveryTimeoutMsec)
)
res.raise_on_error()

return await asyncio.futures.wrap_future(ctx.future)

Expand All @@ -2086,11 +2081,10 @@ async def CommissionWithCode(self, setupPayload: str, nodeid: int, discoveryType

async with self._commissioning_context as ctx:
self._enablePairingCompleteCallback(True)
res = await self._ChipStack.CallAsync(
await self._ChipStack.CallAsync(
lambda: self._dmLib.pychip_DeviceController_ConnectWithCode(
self.devCtrl, setupPayload.encode("utf-8"), nodeid, discoveryType.value)
)
res.raise_on_error()

return await asyncio.futures.wrap_future(ctx.future)

Expand All @@ -2106,11 +2100,10 @@ async def CommissionIP(self, ipaddr: str, setupPinCode: int, nodeid: int) -> int

async with self._commissioning_context as ctx:
self._enablePairingCompleteCallback(True)
res = await self._ChipStack.CallAsync(
await self._ChipStack.CallAsync(
lambda: self._dmLib.pychip_DeviceController_ConnectIP(
self.devCtrl, ipaddr.encode("utf-8"), setupPinCode, nodeid)
)
res.raise_on_error()

return await asyncio.futures.wrap_future(ctx.future)

Expand All @@ -2127,11 +2120,11 @@ async def IssueNOCChain(self, csr: Clusters.OperationalCredentials.Commands.CSRR
self.CheckIsActive()

async with self._issue_node_chain_context as ctx:
res = await self._ChipStack.CallAsync(
await self._ChipStack.CallAsync(
lambda: self._dmLib.pychip_DeviceController_IssueNOCChain(
self.devCtrl, py_object(self), csr.NOCSRElements, len(csr.NOCSRElements), nodeId)
)
res.raise_on_error()

return await asyncio.futures.wrap_future(ctx.future)


Expand Down
7 changes: 6 additions & 1 deletion src/controller/python/chip/ChipStack.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ def Call(self, callFunct, timeoutMs: int = None):
'''
return self.PostTaskOnChipThread(callFunct).Wait(timeoutMs)

async def CallAsync(self, callFunct, timeoutMs: int = None):
async def CallAsyncWithResult(self, callFunct, timeoutMs: int = None):
'''Run a Python function on CHIP stack, and wait for the response.
This function will post a task on CHIP mainloop and waits for the call response in a asyncio friendly manner.
'''
Expand All @@ -232,6 +232,11 @@ async def CallAsync(self, callFunct, timeoutMs: int = None):

return await asyncio.wait_for(callObj.future, timeoutMs / 1000 if timeoutMs else None)

async def CallAsync(self, callFunct, timeoutMs: int = None) -> None:
'''Run a Python function on CHIP stack, and wait for the response.'''
res: PyChipError = await self.CallAsyncWithResult(callFunct, timeoutMs)
res.raise_on_error()

def PostTaskOnChipThread(self, callFunct) -> AsyncCallableHandle:
'''Run a Python function on CHIP stack, and wait for the response.
This function will post a task on CHIP mainloop, and return an object with Wait() method for getting the result.
Expand Down
2 changes: 1 addition & 1 deletion src/controller/python/chip/clusters/Attribute.py
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,7 @@ def OverrideLivenessTimeoutMs(self, timeoutMs: int):

async def TriggerResubscribeIfScheduled(self, reason: str):
handle = chip.native.GetLibraryHandle()
await builtins.chipStack.CallAsync(
await builtins.chipStack.CallAsyncWithResult(
lambda: handle.pychip_ReadClient_TriggerResubscribeIfScheduled(
self._readTransaction._pReadClient, reason.encode("utf-8"))
)
Expand Down
4 changes: 2 additions & 2 deletions src/controller/python/chip/clusters/Command.py
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ async def SendCommand(future: Future, eventLoop, responseType: Type, device, com

payloadTLV = payload.ToTLV()
ctypes.pythonapi.Py_IncRef(ctypes.py_object(transaction))
return await builtins.chipStack.CallAsync(
return await builtins.chipStack.CallAsyncWithResult(
lambda: handle.pychip_CommandSender_SendCommand(
ctypes.py_object(transaction), device,
c_uint16(0 if timedRequestTimeoutMs is None else timedRequestTimeoutMs), commandPath.EndpointId,
Expand Down Expand Up @@ -388,7 +388,7 @@ async def SendBatchCommands(future: Future, eventLoop, device, commands: List[In
transaction = AsyncBatchCommandsTransaction(future, eventLoop, responseTypes)
ctypes.pythonapi.Py_IncRef(ctypes.py_object(transaction))

return await builtins.chipStack.CallAsync(
return await builtins.chipStack.CallAsyncWithResult(
lambda: handle.pychip_CommandSender_SendBatchCommands(
py_object(transaction), device,
c_uint16(0 if timedRequestTimeoutMs is None else timedRequestTimeoutMs),
Expand Down
Loading