Skip to content

Commit

Permalink
fix: 'ClientConnection' object has no attribute 'closed' (#1613)
Browse files Browse the repository at this point in the history
* fix: 'ClientConnection' object has no attribute 'closed'

* Fix an issue with old version of websockets

* Update slack_sdk/socket_mode/websockets/__init__.py

Co-authored-by: Kazuhiro Sera <seratch@gmail.com>

* Add more CI coverage round this issue

* Revert flaky tests

---------

Co-authored-by: Kazuhiro Sera <seratch@gmail.com>
  • Loading branch information
WilliamBergamin and seratch authored Dec 13, 2024
1 parent a7223d9 commit 5c57a32
Showing 1 changed file with 26 additions and 9 deletions.
35 changes: 26 additions & 9 deletions slack_sdk/socket_mode/websockets/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
* https://pypi.org/project/websockets/
"""

import asyncio
import logging
from asyncio import Future, Lock
Expand All @@ -15,8 +16,13 @@
import websockets
from websockets.exceptions import WebSocketException

# To keep compatibility with websockets 8.x, we use this import over .legacy.client
from websockets import WebSocketClientProtocol
try:
from websockets.asyncio.client import ClientConnection
except ImportError:
# To keep compatibility with websockets <14.x we use WebSocketClientProtocol
# To keep compatibility with websockets 8.x, we use this import over .legacy.client
from websockets import WebSocketClientProtocol as ClientConnection


from slack_sdk.socket_mode.async_client import AsyncBaseSocketModeClient
from slack_sdk.socket_mode.async_listeners import (
Expand All @@ -29,6 +35,17 @@
from ..logger.messages import debug_redacted_message_string


def _session_closed(session: Optional[ClientConnection]) -> bool:
if session is None:
return True
if hasattr(session, "closed"):
# The session is a WebSocketClientProtocol instance
return session.closed
# WebSocket close code, defined in https://datatracker.ietf.org/doc/html/rfc6455.html#section-7.1.5
# None if the connection isn’t closed yet.
return session.close_code is not None


class SocketModeClient(AsyncBaseSocketModeClient):
logger: Logger
web_client: AsyncWebClient
Expand All @@ -55,7 +72,7 @@ class SocketModeClient(AsyncBaseSocketModeClient):
ping_interval: float
trace_enabled: bool

current_session: Optional[WebSocketClientProtocol]
current_session: Optional[ClientConnection]
current_session_monitor: Optional[Future]

auto_reconnect_enabled: bool
Expand Down Expand Up @@ -105,7 +122,7 @@ async def monitor_current_session(self) -> None:
# In the asyncio runtime, accessing a shared object (self.current_session here) from
# multiple tasks can cause race conditions and errors.
# To avoid such, we access only the session that is active when this loop starts.
session: WebSocketClientProtocol = self.current_session
session: ClientConnection = self.current_session
session_id: str = await self.session_id()
if self.logger.level <= logging.DEBUG:
self.logger.debug(f"A new monitor_current_session() execution loop for {session_id} started")
Expand All @@ -117,7 +134,7 @@ async def monitor_current_session(self) -> None:
break
await asyncio.sleep(self.ping_interval)
try:
if self.auto_reconnect_enabled and (session is None or session.closed):
if self.auto_reconnect_enabled and _session_closed(session=session):
self.logger.info(f"The session ({session_id}) seems to be already closed. Reconnecting...")
await self.connect_to_new_endpoint()
except Exception as e:
Expand All @@ -134,7 +151,7 @@ async def receive_messages(self) -> None:
# In the asyncio runtime, accessing a shared object (self.current_session here) from
# multiple tasks can cause race conditions and errors.
# To avoid such, we access only the session that is active when this loop starts.
session: WebSocketClientProtocol = self.current_session
session: ClientConnection = self.current_session
session_id: str = await self.session_id()
consecutive_error_count = 0
if self.logger.level <= logging.DEBUG:
Expand Down Expand Up @@ -171,15 +188,15 @@ async def receive_messages(self) -> None:
raise

async def is_connected(self) -> bool:
return not self.closed and self.current_session is not None and not self.current_session.closed
return not self.closed and not _session_closed(self.current_session)

async def session_id(self) -> str:
return self.build_session_id(self.current_session)

async def connect(self):
if self.wss_uri is None:
self.wss_uri = await self.issue_new_wss_url()
old_session: Optional[WebSocketClientProtocol] = None if self.current_session is None else self.current_session
old_session: Optional[ClientConnection] = None if self.current_session is None else self.current_session
# NOTE: websockets does not support proxy settings
self.current_session = await websockets.connect(
uri=self.wss_uri,
Expand Down Expand Up @@ -250,7 +267,7 @@ async def close(self):
self.message_receiver.cancel()

@classmethod
def build_session_id(cls, session: WebSocketClientProtocol) -> str:
def build_session_id(cls, session: ClientConnection) -> str:
if session is None:
return ""
return "s_" + str(hash(session))

0 comments on commit 5c57a32

Please sign in to comment.