Skip to content

Commit

Permalink
handle exception when websocket server start room failed
Browse files Browse the repository at this point in the history
  • Loading branch information
Jialin Zhang committed Apr 24, 2024
1 parent adcde32 commit a214112
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 53 deletions.
122 changes: 71 additions & 51 deletions projects/jupyter-server-ydoc/jupyter_server_ydoc/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import json
import time
import uuid
from typing import Any
from typing import Any, Awaitable

from jupyter_server.auth import authorized
from jupyter_server.base.handlers import APIHandler, JupyterHandler
Expand Down Expand Up @@ -70,52 +70,70 @@ def create_task(self, aw):
task.add_done_callback(self._background_tasks.discard)

async def prepare(self):
if not self._websocket_server.started.is_set():
self.create_task(self._websocket_server.start())
await self._websocket_server.started.wait()

# Get room
self._room_id: str = room_id_from_encoded_path(self.request.path)

async with self._room_lock(self._room_id):
if self._websocket_server.room_exists(self._room_id):
self.room: YRoom = await self._websocket_server.get_room(self._room_id)
else:
if self._room_id.count(":") >= 2:
# DocumentRoom
file_format, file_type, file_id = decode_file_path(self._room_id)
if file_id in self._file_loaders:
self._emit(
LogLevel.WARNING,
None,
"There is another collaborative session accessing the same file.\nThe synchronization between rooms is not supported and you might lose some of your changes.",
res = super().prepare()
if isinstance(res, Awaitable):
await res
if not self._websocket_server.started.is_set():
self.create_task(self._websocket_server.start())
await self._websocket_server.started.wait()

# Get room
self._room_id: str = room_id_from_encoded_path(self.request.path)

async with self._room_lock(self._room_id):
if self._websocket_server.room_exists(self._room_id):
self.room: YRoom = await self._websocket_server.get_room(self._room_id)
else:
if self._room_id.count(":") >= 2:
# DocumentRoom
file_format, file_type, file_id = decode_file_path(self._room_id)
if file_id in self._file_loaders:
self._emit(
LogLevel.WARNING,
None,
"There is another collaborative session accessing the same file.\nThe synchronization between rooms is not supported and you might lose some of your changes.",
)

file = self._file_loaders[file_id]
updates_file_path = f".{file_type}:{file_id}.y"
ystore = self._ystore_class(path=updates_file_path, log=self.log)
self.room = DocumentRoom(
self._room_id,
file_format,
file_type,
file,
self.event_logger,
ystore,
self.log,
self._document_save_delay,
)

else:
# TransientRoom
# it is a transient document (e.g. awareness)
self.room = TransientRoom(self._room_id, self.log)

try:
await self._websocket_server.start_room(self.room)
except Exception as e:
self.log.error("Room %s failed to start on websocket server", self._room_id)
# Clean room
self.room.stop()
self.log.info("Room %s deleted", self._room_id)
self._emit(LogLevel.INFO, "clean", "Room deleted.")

# if websocket server failed, then room and file will be garbage collected and file
# Clean the file loader in file loader mapping if there are not rooms using it
_, _, file_id = decode_file_path(self._room_id)
file = self._file_loaders[file_id]
updates_file_path = f".{file_type}:{file_id}.y"
ystore = self._ystore_class(path=updates_file_path, log=self.log)
self.room = DocumentRoom(
self._room_id,
file_format,
file_type,
file,
self.event_logger,
ystore,
self.log,
self._document_save_delay,
)

else:
# TransientRoom
# it is a transient document (e.g. awareness)
self.room = TransientRoom(self._room_id, self.log)

await self._websocket_server.start_room(self.room)
self._websocket_server.add_room(self._room_id, self.room)

res = super().prepare()
if res is not None:
return await res
if file.number_of_subscriptions == 0 or (
file.number_of_subscriptions == 1 and self._room_id in file._subscriptions
):
self.log.info("Deleting file %s", file.path)
await self._file_loaders.remove(file_id)
self._emit(LogLevel.INFO, "clean", "file loader removed.")
raise e
self._websocket_server.add_room(self._room_id, self.room)

def initialize(
self,
Expand Down Expand Up @@ -292,13 +310,15 @@ def on_close(self) -> None:
"""
# stop serving this client
self._message_queue.put_nowait(b"")
if isinstance(self.room, DocumentRoom) and self.room.clients == [self]:
# no client in this room after we disconnect
# keep the document for a while in case someone reconnects
self.log.info("Cleaning room: %s", self._room_id)
self.room.cleaner = asyncio.create_task(self._clean_room())
if self._room_id != "JupyterLab:globalAwareness":
self._emit_awareness_event(self.current_user.username, "leave")
if hasattr(self, "room"):
if isinstance(self.room, DocumentRoom) and self.room.clients == [self]:
# no client in this room after we disconnect
# keep the document for a while in case someone reconnects
self.log.info("Cleaning room: %s", self._room_id)
self.room.cleaner = asyncio.create_task(self._clean_room())
if hasattr(self, "_room_id"):
if self._room_id != "JupyterLab:globalAwareness":
self._emit_awareness_event(self.current_user.username, "leave")

def _emit(self, level: LogLevel, action: str | None = None, msg: str | None = None) -> None:
_, _, file_id = decode_file_path(self._room_id)
Expand Down
5 changes: 4 additions & 1 deletion projects/jupyter-server-ydoc/jupyter_server_ydoc/loaders.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,10 @@ async def clean(self) -> None:
if self._watcher is not None:
if not self._watcher.cancelled():
self._watcher.cancel()
await self._watcher
try:
await self._watcher
except asyncio.CancelledError:
self._log.info(f"file watcher for '{self.file_id}' is cancelled now")

def observe(self, id: str, callback: Callable[[], Coroutine[Any, Any, None]]) -> None:
"""
Expand Down
14 changes: 13 additions & 1 deletion projects/jupyter-server-ydoc/jupyter_server_ydoc/rooms.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,10 @@ def stop(self) -> None:
Cancels the save task and unsubscribes from the file.
"""
super().stop()
try:
super().stop()
except RuntimeError:
pass
# TODO: Should we cancel or wait ?
if self._saving_document:
self._saving_document.cancel()
Expand Down Expand Up @@ -299,3 +302,12 @@ async def _broadcast_updates(self):
await super()._broadcast_updates()
except asyncio.CancelledError:
pass

def stop(self) -> None:
"""
Stop the room.
"""
try:
super().stop()
except RuntimeError:
pass

0 comments on commit a214112

Please sign in to comment.