diff --git a/projects/jupyter-server-ydoc/jupyter_server_ydoc/handlers.py b/projects/jupyter-server-ydoc/jupyter_server_ydoc/handlers.py index 59849058..d1542796 100644 --- a/projects/jupyter-server-ydoc/jupyter_server_ydoc/handlers.py +++ b/projects/jupyter-server-ydoc/jupyter_server_ydoc/handlers.py @@ -7,7 +7,7 @@ import json import time import uuid -from typing import Any +from typing import Any, Awaitable from jupyter_server.auth import authorized from jupyter_server.base.handlers import APIHandler, JupyterHandler @@ -70,52 +70,70 @@ def create_task(self, aw): task.add_done_callback(self._background_tasks.discard) async def prepare(self): - if not self._websocket_server.started.is_set(): - self.create_task(self._websocket_server.start()) - await self._websocket_server.started.wait() - - # Get room - self._room_id: str = room_id_from_encoded_path(self.request.path) - - async with self._room_lock(self._room_id): - if self._websocket_server.room_exists(self._room_id): - self.room: YRoom = await self._websocket_server.get_room(self._room_id) - else: - if self._room_id.count(":") >= 2: - # DocumentRoom - file_format, file_type, file_id = decode_file_path(self._room_id) - if file_id in self._file_loaders: - self._emit( - LogLevel.WARNING, - None, - "There is another collaborative session accessing the same file.\nThe synchronization between rooms is not supported and you might lose some of your changes.", + res = super().prepare() + if isinstance(res, Awaitable): + await res + if not self._websocket_server.started.is_set(): + self.create_task(self._websocket_server.start()) + await self._websocket_server.started.wait() + + # Get room + self._room_id: str = room_id_from_encoded_path(self.request.path) + + async with self._room_lock(self._room_id): + if self._websocket_server.room_exists(self._room_id): + self.room: YRoom = await self._websocket_server.get_room(self._room_id) + else: + if self._room_id.count(":") >= 2: + # DocumentRoom + file_format, file_type, file_id = decode_file_path(self._room_id) + if file_id in self._file_loaders: + self._emit( + LogLevel.WARNING, + None, + "There is another collaborative session accessing the same file.\nThe synchronization between rooms is not supported and you might lose some of your changes.", + ) + + file = self._file_loaders[file_id] + updates_file_path = f".{file_type}:{file_id}.y" + ystore = self._ystore_class(path=updates_file_path, log=self.log) + self.room = DocumentRoom( + self._room_id, + file_format, + file_type, + file, + self.event_logger, + ystore, + self.log, + self._document_save_delay, ) + else: + # TransientRoom + # it is a transient document (e.g. awareness) + self.room = TransientRoom(self._room_id, self.log) + + try: + await self._websocket_server.start_room(self.room) + except Exception as e: + self.log.error("Room %s failed to start on websocket server", self._room_id) + # Clean room + self.room.stop() + self.log.info("Room %s deleted", self._room_id) + self._emit(LogLevel.INFO, "clean", "Room deleted.") + + # if websocket server failed, then room and file will be garbage collected and file + # Clean the file loader in file loader mapping if there are not rooms using it + _, _, file_id = decode_file_path(self._room_id) file = self._file_loaders[file_id] - updates_file_path = f".{file_type}:{file_id}.y" - ystore = self._ystore_class(path=updates_file_path, log=self.log) - self.room = DocumentRoom( - self._room_id, - file_format, - file_type, - file, - self.event_logger, - ystore, - self.log, - self._document_save_delay, - ) - - else: - # TransientRoom - # it is a transient document (e.g. awareness) - self.room = TransientRoom(self._room_id, self.log) - - await self._websocket_server.start_room(self.room) - self._websocket_server.add_room(self._room_id, self.room) - - res = super().prepare() - if res is not None: - return await res + if file.number_of_subscriptions == 0 or ( + file.number_of_subscriptions == 1 and self._room_id in file._subscriptions + ): + self.log.info("Deleting file %s", file.path) + await self._file_loaders.remove(file_id) + self._emit(LogLevel.INFO, "clean", "file loader removed.") + raise e + self._websocket_server.add_room(self._room_id, self.room) def initialize( self, @@ -292,13 +310,15 @@ def on_close(self) -> None: """ # stop serving this client self._message_queue.put_nowait(b"") - if isinstance(self.room, DocumentRoom) and self.room.clients == [self]: - # no client in this room after we disconnect - # keep the document for a while in case someone reconnects - self.log.info("Cleaning room: %s", self._room_id) - self.room.cleaner = asyncio.create_task(self._clean_room()) - if self._room_id != "JupyterLab:globalAwareness": - self._emit_awareness_event(self.current_user.username, "leave") + if hasattr(self, "room"): + if isinstance(self.room, DocumentRoom) and self.room.clients == [self]: + # no client in this room after we disconnect + # keep the document for a while in case someone reconnects + self.log.info("Cleaning room: %s", self._room_id) + self.room.cleaner = asyncio.create_task(self._clean_room()) + if hasattr(self, "_room_id"): + if self._room_id != "JupyterLab:globalAwareness": + self._emit_awareness_event(self.current_user.username, "leave") def _emit(self, level: LogLevel, action: str | None = None, msg: str | None = None) -> None: _, _, file_id = decode_file_path(self._room_id) diff --git a/projects/jupyter-server-ydoc/jupyter_server_ydoc/loaders.py b/projects/jupyter-server-ydoc/jupyter_server_ydoc/loaders.py index 62cc0a0a..a2622cca 100644 --- a/projects/jupyter-server-ydoc/jupyter_server_ydoc/loaders.py +++ b/projects/jupyter-server-ydoc/jupyter_server_ydoc/loaders.py @@ -74,7 +74,10 @@ async def clean(self) -> None: if self._watcher is not None: if not self._watcher.cancelled(): self._watcher.cancel() - await self._watcher + try: + await self._watcher + except asyncio.CancelledError: + self._log.info(f"file watcher for '{self.file_id}' is cancelled now") def observe(self, id: str, callback: Callable[[], Coroutine[Any, Any, None]]) -> None: """ diff --git a/projects/jupyter-server-ydoc/jupyter_server_ydoc/rooms.py b/projects/jupyter-server-ydoc/jupyter_server_ydoc/rooms.py index e88e6d39..f90b56ce 100644 --- a/projects/jupyter-server-ydoc/jupyter_server_ydoc/rooms.py +++ b/projects/jupyter-server-ydoc/jupyter_server_ydoc/rooms.py @@ -161,7 +161,10 @@ def stop(self) -> None: Cancels the save task and unsubscribes from the file. """ - super().stop() + try: + super().stop() + except RuntimeError: + pass # TODO: Should we cancel or wait ? if self._saving_document: self._saving_document.cancel() @@ -299,3 +302,12 @@ async def _broadcast_updates(self): await super()._broadcast_updates() except asyncio.CancelledError: pass + + def stop(self) -> None: + """ + Stop the room. + """ + try: + super().stop() + except RuntimeError: + pass