From ba6d7562b913c8d8bc547fed6a84873c5c98ee53 Mon Sep 17 00:00:00 2001 From: Carlos Herrero <26092748+hbcarlos@users.noreply.github.com> Date: Tue, 11 Apr 2023 16:23:51 +0200 Subject: [PATCH] Creates a new FileLoader class to separate the logic of watching files (#121) * Split code in multiple files * Uses new rooms with file loader * Uses the new FileLoader * Add observer pattern for notifying room * Fixes watching files and cleaning rooms * Uses locks to make sure only one client initializes the room * Clean up before shut down * Review: room lock private and scoped config variables * Review: use Logger | None * Review: save condition, load lock and iterate over subscriptions * Fix file lock * Uses file_id instead of paths * lint * Ignore type error * Lint * Review: Freds comments * Lint * Warn users about opening multiple view for the same file * Adds docstring * More docstring * Fixes sync between rooms * Lint * Improves sync * flake * Revert default settings changes --- jupyter_collaboration/app.py | 6 +- jupyter_collaboration/handlers.py | 474 +++++++++++++++-------------- jupyter_collaboration/loaders.py | 168 ++++++++++ jupyter_collaboration/rooms.py | 258 ++++++++++++++++ jupyter_collaboration/stores.py | 32 ++ jupyter_collaboration/utils.py | 33 ++ packages/docprovider/src/ydrive.ts | 22 +- pyproject.toml | 4 +- 8 files changed, 759 insertions(+), 238 deletions(-) create mode 100644 jupyter_collaboration/loaders.py create mode 100644 jupyter_collaboration/rooms.py create mode 100644 jupyter_collaboration/stores.py create mode 100644 jupyter_collaboration/utils.py diff --git a/jupyter_collaboration/app.py b/jupyter_collaboration/app.py index 202b39ad..d3cd0453 100644 --- a/jupyter_collaboration/app.py +++ b/jupyter_collaboration/app.py @@ -5,7 +5,8 @@ from traitlets import Float, Int, Type from ypy_websocket.ystore import BaseYStore -from .handlers import DocSessionHandler, SQLiteYStore, YDocWebSocketHandler +from .handlers import DocSessionHandler, YDocWebSocketHandler +from .stores import SQLiteYStore class YDocExtension(ExtensionApp): @@ -61,3 +62,6 @@ def initialize_handlers(self): (r"/api/collaboration/session/(.*)", DocSessionHandler), ] ) + + async def stop_extension(self): + YDocWebSocketHandler.clean_up() diff --git a/jupyter_collaboration/handlers.py b/jupyter_collaboration/handlers.py index e7530609..0ba1005f 100644 --- a/jupyter_collaboration/handlers.py +++ b/jupyter_collaboration/handlers.py @@ -4,73 +4,33 @@ import asyncio import json import uuid -from logging import Logger +from logging import getLogger from pathlib import Path -from typing import Any, Dict, Optional, Set, Tuple +from typing import Any, Dict, Optional, Set from jupyter_server.auth import authorized from jupyter_server.base.handlers import APIHandler, JupyterHandler -from jupyter_server.utils import ensure_async +from jupyter_server.serverapp import ServerWebApplication from jupyter_ydoc import ydocs as YDOCS from tornado import web +from tornado.httputil import HTTPServerRequest from tornado.websocket import WebSocketHandler -from traitlets import Int, Unicode -from traitlets.config import LoggingConfigurable from ypy_websocket.websocket_server import WebsocketServer, YRoom -from ypy_websocket.ystore import BaseYStore -from ypy_websocket.ystore import SQLiteYStore as _SQLiteYStore -from ypy_websocket.ystore import TempFileYStore as _TempFileYStore -from ypy_websocket.ystore import YDocNotFound from ypy_websocket.yutils import YMessageType +from .loaders import FileLoader +from .rooms import DocumentRoom, TransientRoom +from .utils import decode_file_path + YFILE = YDOCS["file"] SERVER_SESSION = str(uuid.uuid4()) -class TempFileYStore(_TempFileYStore): - prefix_dir = "jupyter_ystore_" - - -class SQLiteYStoreMetaclass(type(LoggingConfigurable), type(_SQLiteYStore)): # type: ignore +class RoomNotFound(Exception): pass -class SQLiteYStore(LoggingConfigurable, _SQLiteYStore, metaclass=SQLiteYStoreMetaclass): - db_path = Unicode( - ".jupyter_ystore.db", - config=True, - help="""The path to the YStore database. Defaults to '.jupyter_ystore.db' in the current - directory.""", - ) - - document_ttl = Int( - None, - allow_none=True, - config=True, - help="""The document time-to-live in seconds. Defaults to None (document history is never - cleared).""", - ) - - -class DocumentRoom(YRoom): - """A Y room for a possibly stored document (e.g. a notebook).""" - - def __init__(self, type: str, ystore: BaseYStore, log: Optional[Logger]): - super().__init__(ready=False, ystore=ystore, log=log) - self.type = type - self.cleaner: Optional["asyncio.Task[Any]"] = None - self.watcher: Optional["asyncio.Task[Any]"] = None - self.document = YDOCS.get(type, YFILE)(self.ydoc) - - -class TransientRoom(YRoom): - """A Y room for sharing state (e.g. awareness).""" - - def __init__(self, log: Optional[Logger]): - super().__init__(log=log) - - class JupyterWebsocketServer(WebsocketServer): rooms: Dict[str, YRoom] ypatch_nb: int @@ -84,9 +44,55 @@ def __init__(self, *args, **kwargs): self.ypatch_nb = 0 self.connected_users = {} self.background_tasks = set() - self.monitor_task = asyncio.create_task(self.monitor()) + self.monitor_task = asyncio.create_task(self._monitor()) + + def room_exists(self, path: str) -> bool: + """ + Returns true is the room exist or false otherwise. - async def monitor(self): + Parameters: + path (str): Room ID. + + Returns: + exists (bool): Whether the room exists or not. + """ + return path in self.rooms + + def add_room(self, path: str, room: YRoom) -> None: + """ + Adds a new room. + + Parameters: + path (str): Room ID. + room (YRoom): A room. + """ + self.rooms[path] = room + + def get_room(self, path: str) -> YRoom: + """ + Returns the room for the specified room ID or raises a RoomNotFound + error if the room doesn't exist. + + Parameters: + path (str): Room ID. + + Returns: + room (YRoom): The room. + """ + if path not in self.rooms: + # Document rooms need a file + raise RoomNotFound + + return self.rooms[path] + + async def _monitor(self): + """ + An infinite loop with a 60 seconds delay for counting the number + of patches processed in a minute and how many clients are connected. + + #### Note: + This method runs in a coroutine for debugging purposes. + """ while True: await asyncio.sleep(60) clients_nb = sum(len(room.clients) for room in self.rooms.values()) @@ -94,20 +100,6 @@ async def monitor(self): self.log.info("Connected Y users: %s", clients_nb) self.ypatch_nb = 0 - def get_room(self, path: str) -> YRoom: - if path not in self.rooms: - if path.count(":") >= 2: - # it is a stored document (e.g. a notebook) - file_format, file_type, file_path = path.split(":", 2) - p = Path(file_path) - updates_file_path = str(p.parent / f".{file_type}:{p.name}.y") - ystore = self.ystore_class(path=updates_file_path, log=self.log) - self.rooms[path] = DocumentRoom(file_type, ystore, self.log) - else: - # it is a transient document (e.g. awareness) - self.rooms[path] = TransientRoom(self.log) - return self.rooms[path] - class YDocWebSocketHandler(WebSocketHandler, JupyterHandler): """`YDocWebSocketHandler` uses the singleton pattern for ``WebsocketServer``, @@ -129,13 +121,98 @@ class YDocWebSocketHandler(WebSocketHandler, JupyterHandler): receiving a message. """ - saving_document: Optional["asyncio.Task[Any]"] + files: Dict[str, FileLoader] = {} websocket_server: Optional[JupyterWebsocketServer] = None _message_queue: "asyncio.Queue[Any]" - # Override max_message size to 1GB + def __init__( + self, app: ServerWebApplication, request: HTTPServerRequest, **kwargs: Dict[str, Any] + ): + super().__init__(app, request, **kwargs) + + # CONFIG + file_id_manager = self.settings["file_id_manager"] + ystore_class = self.settings["collaborative_ystore_class"] + self._cleanup_delay = self.settings["collaborative_document_cleanup_delay"] + # self.settings["collaborative_file_poll_interval"] + # self.settings["collaborative_document_save_delay"] + + # Instantiate the JupyterWebsocketServer as a Class property + # if it doesn't exist yet + if self.websocket_server is None: + for k, v in self.config.get(ystore_class.__name__, {}).items(): + setattr(ystore_class, k, v) + + YDocWebSocketHandler.websocket_server = JupyterWebsocketServer( + rooms_ready=False, + auto_clean_rooms=False, + ystore_class=ystore_class, + log=self.log, + ) + + assert self.websocket_server is not None + self._message_queue = asyncio.Queue() + + # Get room + self._room_id: str = request.path.split("/")[-1] + + if self.websocket_server.room_exists(self._room_id): + self.room: YRoom = self.websocket_server.get_room(self._room_id) + + else: + if self._room_id.count(":") >= 2: + # DocumentRoom + file_format, file_type, file_id = decode_file_path(self._room_id) + path = file_id_manager.get_path(file_id) + + # Instantiate the FileLoader if it doesn't exist yet + file = YDocWebSocketHandler.files.get(file_id) + if file is None: + self.log.info("Creating FileLoader for: %s", path) + file = FileLoader( + file_id, + file_format, + file_type, + file_id_manager, + self.contents_manager, + self.log, + self.settings["collaborative_file_poll_interval"], + ) + self.files[file_id] = file + + path = Path(path) + updates_file_path = str(path.parent / f".{file_type}:{path.name}.y") + ystore = ystore_class(path=updates_file_path, log=self.log) + self.room = DocumentRoom( + self._room_id, + file_format, + file_type, + file, + ystore, + self.log, + self.settings["collaborative_document_save_delay"], + ) + + else: + # TransientRoom + # it is a transient document (e.g. awareness) + self.room = TransientRoom(self._room_id, self.log) + + self.websocket_server.add_room(self._room_id, self.room) + + @property + def path(self): + """ + Returns the room id. It needs to be called 'path' for compatibility with + the WebsocketServer (websocket.path). + """ + return self._room_id + @property def max_message_size(self): + """ + Override max_message size to 1GB + """ return 1024 * 1024 * 1024 def __aiter__(self): @@ -149,132 +226,42 @@ async def __anext__(self): raise StopAsyncIteration() return message - def get_file_info(self) -> Tuple[str, str, str]: - assert self.websocket_server is not None - assert isinstance(self.room, DocumentRoom) - room_name = self.websocket_server.get_room_name(self.room) - file_format: str - file_type: str - file_path: Optional[str] - file_id: str - file_format, file_type, file_id = room_name.split(":", 2) - file_path = self.settings["file_id_manager"].get_path(file_id) - if file_path is None: - raise RuntimeError(f"File {self.room.document.path} cannot be found anymore") - assert file_path is not None - if file_path != self.room.document.path: - self.log.debug( - "File with ID %s was moved from %s to %s", - file_id, - self.room.document.path, - file_path, - ) - self.room.document.path = file_path - return file_format, file_type, file_path - - def set_file_info(self, value: str) -> None: - assert self.websocket_server is not None - self.websocket_server.rename_room(value, from_room=self.room) - self.path = value # needed to be compatible with WebsocketServer (websocket.path) - async def get(self, *args, **kwargs): + """ + Overrides default behavior to check whether the client is authenticated or not. + """ if self.get_current_user() is None: self.log.warning("Couldn't authenticate WebSocket connection") raise web.HTTPError(403) return await super().get(*args, **kwargs) - async def open(self, path): - ystore_class = self.settings["collaborative_ystore_class"] - if self.websocket_server is None: - for k, v in self.config.get(ystore_class.__name__, {}).items(): - setattr(ystore_class, k, v) - YDocWebSocketHandler.websocket_server = JupyterWebsocketServer( - rooms_ready=False, - auto_clean_rooms=False, - ystore_class=ystore_class, - log=self.log, - ) - self._message_queue = asyncio.Queue() - self.lock = asyncio.Lock() + async def open(self, room_id): + """ + On connection open. + """ assert self.websocket_server is not None - self.room = self.websocket_server.get_room(path) - self.set_file_info(path) - self.saving_document = None + task = asyncio.create_task(self.websocket_server.serve(self)) self.websocket_server.background_tasks.add(task) task.add_done_callback(self.websocket_server.background_tasks.discard) - # Close the connection if the document session expired - session_id = self.get_query_argument("sessionId", "") - if isinstance(self.room, DocumentRoom) and SERVER_SESSION != session_id: - self.close(1003, f"Document session {session_id} expired") + if isinstance(self.room, DocumentRoom): + # Close the connection if the document session expired + session_id = self.get_query_argument("sessionId", "") + if SERVER_SESSION != session_id: + self.close(1003, f"Document session {session_id} expired") - # cancel the deletion of the room if it was scheduled - if isinstance(self.room, DocumentRoom) and self.room.cleaner is not None: - self.room.cleaner.cancel() - - if isinstance(self.room, DocumentRoom) and not self.room.ready: - file_format, file_type, file_path = self.get_file_info() - self.log.debug("Opening Y document from disk: %s", file_path) - model = await ensure_async( - self.contents_manager.get(file_path, type=file_type, format=file_format) - ) - self.last_modified = model["last_modified"] - # check again if ready, because loading the file can be async - if not self.room.ready: - # try to apply Y updates from the YStore for this document - read_from_source = True - if self.room.ystore is not None: - try: - await self.room.ystore.apply_updates(self.room.ydoc) - read_from_source = False - except YDocNotFound: - # YDoc not found in the YStore, create the document from the source file (no change history) - pass - if not read_from_source: - # if YStore updates and source file are out-of-sync, resync updates with source - if self.room.document.source != model["content"]: - read_from_source = True - - if read_from_source: - self.room.document.source = model["content"] - if self.room.ystore: - await self.room.ystore.encode_state_as_update(self.room.ydoc) - self.room.document.dirty = False - self.room.ready = True - self.room.watcher = asyncio.create_task(self.watch_file()) - # save the document when changed - self.room.document.observe(self.on_document_change) - - async def watch_file(self): - assert isinstance(self.room, DocumentRoom) - poll_interval = self.settings["collaborative_file_poll_interval"] - if not poll_interval: - self.room.watcher = None - return - while True: - await asyncio.sleep(poll_interval) - await self.maybe_load_document() + # cancel the deletion of the room if it was scheduled + if self.room.cleaner is not None: + self.room.cleaner.cancel() - async def maybe_load_document(self): - assert isinstance(self.room, DocumentRoom) - file_format, file_type, file_path = self.get_file_info() - async with self.lock: - model = await ensure_async( - self.contents_manager.get( - file_path, content=False, type=file_type, format=file_format - ) - ) - # do nothing if the file was saved by us - if self.last_modified < model["last_modified"]: - self.log.debug("Reverting file that had out-of-band changes: %s", file_path) - model = await ensure_async( - self.contents_manager.get(file_path, type=file_type, format=file_format) - ) - self.room.document.source = model["content"] - self.last_modified = model["last_modified"] + # Initialize the room + await self.room.initialize() async def send(self, message): + """ + Send a message to the client. + """ # needed to be compatible with WebsocketServer (websocket.send) try: self.write_message(message, binary=True) @@ -282,10 +269,16 @@ async def send(self, message): self.log.debug("Failed to write message", exc_info=e) async def recv(self): + """ + Receive a message from the client. + """ message = await self._message_queue.get() return message def on_message(self, message): + """ + On message receive. + """ assert self.websocket_server is not None message_type = message[0] if message_type == YMessageType.AWARENESS: @@ -312,88 +305,101 @@ def on_message(self, message): YMessageType(message_type).name, ) return skip + self._message_queue.put_nowait(message) self.websocket_server.ypatch_nb += 1 def on_close(self) -> None: + """ + On connection close. + """ # stop serving this client self._message_queue.put_nowait(b"") if isinstance(self.room, DocumentRoom) and self.room.clients == [self]: # no client in this room after we disconnect # keep the document for a while in case someone reconnects - self.room.cleaner = asyncio.create_task(self.clean_room()) + self.log.info("Cleaning room: %s", self._room_id) + self.room.cleaner = asyncio.create_task(self._clean_room()) - async def clean_room(self) -> None: + async def _clean_room(self) -> None: + """ + Async task for cleaning up the resources. + + When all the clients of a room leave, we setup a task to clean up the resources + after a certain amount of time. We need to wait a few seconds to clean up the room + because sometimes websockets unintentionally disconnect. + + During the clean up, we need to delete the room to free resources since the room + contains a copy of the document. In addition, we remove the file if there is no rooms + subscribed to it. + """ assert isinstance(self.room, DocumentRoom) - seconds = self.settings["collaborative_document_cleanup_delay"] - if seconds is None: + + if self._cleanup_delay is None: return - await asyncio.sleep(seconds) - if self.room.watcher is not None: - self.room.watcher.cancel() - self.room.document.unobserve() + + await asyncio.sleep(self._cleanup_delay) + + # Remove the room from the websocket server assert self.websocket_server is not None - file_format, file_type, file_path = self.get_file_info() - self.log.debug("Deleting Y document from memory: %s", file_path) + self.log.info("Deleting Y document from memory: %s", self.room.room_id) self.websocket_server.delete_room(room=self.room) - def on_document_change(self, target, event): - if target == "state" and "dirty" in event.keys: - dirty = event.keys["dirty"]["newValue"] - if not dirty: - # we cleared the dirty flag, nothing to save - return - if self.saving_document is not None and not self.saving_document.done(): - # the document is being saved, cancel that - self.saving_document.cancel() - self.saving_document = None - self.saving_document = asyncio.create_task(self.maybe_save_document()) - - async def maybe_save_document(self): - assert isinstance(self.room, DocumentRoom) - seconds = self.settings["collaborative_document_save_delay"] - if seconds is None: - return - # save after X seconds of inactivity - await asyncio.sleep(seconds) - # if the room cannot be found, don't save - try: - file_format, file_type, file_path = self.get_file_info() - except Exception: - return - self.log.debug("Opening Y document from disk: %s", file_path) - async with self.lock: - model = await ensure_async( - self.contents_manager.get(file_path, type=file_type, format=file_format) - ) - if self.last_modified < model["last_modified"]: - # file changed on disk, let's revert - self.log.debug("Reverting file that had out-of-band changes: %s", file_path) - self.room.document.source = model["content"] - self.last_modified = model["last_modified"] - return - if model["content"] != self.room.document.source: - # don't save if not needed - # this also prevents the dirty flag from bouncing between windows of - # the same document opened as different types (e.g. notebook/text editor) - model["format"] = file_format - model["content"] = self.room.document.source - self.log.debug("Saving Y document to disk: %s", file_path) - async with self.lock: - model = await ensure_async(self.contents_manager.save(model, file_path)) - self.last_modified = model["last_modified"] - self.room.document.dirty = False + # Clean room + del self.room + self.log.info("Room %s deleted", self._room_id) + + # Clean the file loader if there are not rooms using it + _, _, file_id = decode_file_path(self._room_id) + file = self.files[file_id] + if file.number_of_subscriptions == 0: + self.log.info("Deleting file %s", file.path) + file.clean() + del self.files[file_id] def check_origin(self, origin): + """ + Check origin + """ return True + @classmethod + def clean_up(cls): + """ + Class method to stop every coroutine. + + Useful to clean up tasks on server shut down. + """ + log = getLogger(__name__) + log.info("Cleaning up resources before server shut down.") + if cls.websocket_server is not None: + # Cancel tasks and clean up + # TODO: should we wait for any save task? + rooms = list(cls.websocket_server.rooms.values()) + log.info("Deleting rooms.") + for room in rooms: + cls.websocket_server.delete_room(room=room) + + for file in cls.files.values(): + file.clean() + + log.info("Deleting files.") + cls.files.clear() + class DocSessionHandler(APIHandler): + """ + Jupyter Server's handler to retrieve the document's session. + """ + auth_resource = "contents" @web.authenticated @authorized async def put(self, path): + """ + Creates a new session for a given document or returns an existing one. + """ body = json.loads(self.request.body) format = body["format"] content_type = body["type"] diff --git a/jupyter_collaboration/loaders.py b/jupyter_collaboration/loaders.py new file mode 100644 index 00000000..b84ce6b5 --- /dev/null +++ b/jupyter_collaboration/loaders.py @@ -0,0 +1,168 @@ +from __future__ import annotations + +import asyncio +from logging import Logger, getLogger +from typing import Any, Callable, Coroutine + +from jupyter_server.utils import ensure_async + + +class OutOfBandChanges(Exception): + pass + + +class FileLoader: + """ + A class to centralize all the operation on a file. + """ + + def __init__( + self, + file_id: str, + file_format: str, + file_type: str, + file_id_manager: Any, + contents_manager: Any, + log: Logger | None, + poll_interval: float | None = None, + ) -> None: + self._file_id: str = file_id + self._file_format: str = file_format + self._file_type: str = file_type + + self._lock = asyncio.Lock() + self._poll_interval = poll_interval + self._file_id_manager = file_id_manager + self._contents_manager = contents_manager + + self._log = log or getLogger(__name__) + self._subscriptions: dict[ + str, Callable[[str, dict[str, Any]], Coroutine[Any, Any, None]] + ] = {} + + self._watcher = asyncio.create_task(self._watch_file()) if self._poll_interval else None + + @property + def path(self) -> str: + """ + The file path. + """ + return self._file_id_manager.get_path(self._file_id) + + @property + def number_of_subscriptions(self) -> int: + """ + The number of rooms subscribed to this file. + """ + return len(self._subscriptions) + + def clean(self) -> None: + """ + Clean up the file. + + Stops the watch task. + """ + if self._watcher is not None: + self._watcher.cancel() + + def observe( + self, id: str, callback: Callable[[str, dict[str, Any]], Coroutine[Any, Any, None]] + ) -> None: + """ + Subscribe to the file to get notified on file changes. + + Parameters: + id (str): Room ID + callback (Callable): Callback for notifying the room. + """ + self._subscriptions[id] = callback + + def unobserve(self, id: str) -> None: + """ + Unsubscribe to the file. + + Parameters: + id (str): Room ID + """ + del self._subscriptions[id] + + async def load_content(self, format: str, file_type: str, content: bool) -> dict[str, Any]: + """ + Load the content of the file. + + Parameters: + format (str): File format. + file_type (str): Content type. + content (bool): Whether to load the content or not. + + Returns: + model (dict): A dictionary with the metadata and content of the file. + """ + async with self._lock: + return await ensure_async( + self._contents_manager.get( + self.path, format=format, type=file_type, content=content + ) + ) + + async def save_content(self, model: dict[str, Any]) -> dict[str, Any]: + """ + Save the content of the file. + + Parameters: + model (dict): A dictionary with format, type, last_modified, and content of the file. + + Returns: + model (dict): A dictionary with the metadata and content of the file. + + ### Note: + If there is changes on disk, this method will raise an OutOfBandChanges exception. + """ + async with self._lock: + path = self.path + m = await ensure_async( + self._contents_manager.get( + path, format=model["format"], type=model["type"], content=False + ) + ) + + if model["last_modified"] == m["last_modified"]: + self._log.info("Saving file: %s", path) + return await ensure_async(self._contents_manager.save(model, path)) + + else: + # file changed on disk, raise an error + raise OutOfBandChanges + + async def _watch_file(self) -> None: + """ + Async task for watching a file. + """ + self._log.info("Watching file: %s", self.path) + + if self._poll_interval is None: + return + + while True: + await asyncio.sleep(self._poll_interval) + try: + await self._maybe_load_document() + + except Exception as e: + self._log.error("Error watching file: %s\n", self.path, e) + + async def _maybe_load_document(self) -> None: + """ + Notifies subscribed rooms about changes on the content of the file. + """ + async with self._lock: + path = self.path + model = await ensure_async( + self._contents_manager.get( + path, format=self._file_format, type=self._file_type, content=False + ) + ) + + # Notify that the content changed on disk + for callback in self._subscriptions.values(): + await callback("metadata", model) diff --git a/jupyter_collaboration/rooms.py b/jupyter_collaboration/rooms.py new file mode 100644 index 00000000..57d59b20 --- /dev/null +++ b/jupyter_collaboration/rooms.py @@ -0,0 +1,258 @@ +from __future__ import annotations + +import asyncio +from logging import Logger +from typing import Any + +from jupyter_ydoc import ydocs as YDOCS +from ypy_websocket.websocket_server import YRoom +from ypy_websocket.ystore import BaseYStore, YDocNotFound + +from .loaders import FileLoader, OutOfBandChanges + +YFILE = YDOCS["file"] + + +class UpdateFlag: + """ + A context manager for ignoring self updates in the document. + """ + + def __init__(self): + self._updating = False + + def __enter__(self): + self._updating = True + + def __exit__(self, exc_type, exc_val, exc_tb): + self._updating = False + + @property + def updating(self) -> bool: + return self._updating + + +class DocumentRoom(YRoom): + """A Y room for a possibly stored document (e.g. a notebook).""" + + def __init__( + self, + room_id: str, + file_format: str, + file_type: str, + file: FileLoader, + ystore: BaseYStore | None, + log: Logger | None, + save_delay: int | None = None, + ): + super().__init__(ready=False, ystore=ystore, log=log) + + self._room_id: str = room_id + self._file_format: str = file_format + self._file_type: str = file_type + self._last_modified: Any = None + self._file: FileLoader = file + self._document = YDOCS.get(self._file_type, YFILE)(self.ydoc) + + self._save_delay = save_delay + + self._lock = asyncio.Lock() + self._flag = UpdateFlag() + self._cleaner: asyncio.Task | None = None + self._saving_document: asyncio.Task | None = None + + # Listen for document changes + self._document.observe(self._on_document_change) + self._file.observe(self.room_id, self._on_content_change) + + @property + def room_id(self) -> str: + """ + The room ID. + """ + return self._room_id + + @property + def cleaner(self) -> asyncio.Task | None: + """ + The task for cleaning up the resources. + """ + return self._cleaner + + @cleaner.setter + def cleaner(self, value: asyncio.Task) -> None: + """ + Setter for the clean up task. + """ + self._cleaner = value + + async def initialize(self) -> None: + """ + Initializes the room. + + This method is thread safe so only one client can initialize the room. + + To initialize the room, we check if the content was already in the store + as a Y updates and if it is up to date with the content on disk. In this + case, we load the Y updates from the store. Otherwise, we load the content + from disk. + + ### Note: + It is important to set the ready property in the parent class (`self.ready = True`), + this setter will subscribe for updates on the shared document. + """ + async with self._lock: + if self.ready: # type: ignore[has-type] + return + + self.log.info("Initializing room %s", self._room_id) + model = await self._file.load_content(self._file_format, self._file_type, True) + + with self._flag: + # try to apply Y updates from the YStore for this document + read_from_source = True + if self.ystore is not None: + try: + await self.ystore.apply_updates(self.ydoc) + self.log.info( + "Content in room %s loaded from the ystore %s", + self._room_id, + self.ystore.__class__.__name__, + ) + read_from_source = False + except YDocNotFound: + # YDoc not found in the YStore, create the document from the source file (no change history) + pass + + if not read_from_source: + # if YStore updates and source file are out-of-sync, resync updates with source + if self._document.source != model["content"]: + self.log.info( + "Content in file %s is out-of-sync with the ystore %s", + self._file.path, + self.ystore.__class__.__name__, + ) + read_from_source = True + + if read_from_source: + self.log.info( + "Content in room %s loaded from file %s", self._room_id, self._file.path + ) + self._document.source = model["content"] + + if self.ystore: + await self.ystore.encode_state_as_update(self.ydoc) + + self._last_modified = model["last_modified"] + self._document.dirty = False + self.ready = True + + def _clean(self) -> None: + """ + Cleans the rooms. + + Cancels the save task and unsubscribes from the file. + """ + super()._clean() + # TODO: Should we cancel or wait ? + if self._saving_document: + self._saving_document.cancel() + + self._document.unobserve() + self._file.unobserve(self.room_id) + + async def _on_content_change(self, event: str, args: dict[str, Any]) -> None: + """ + Called when the file changes. + + Parameters: + event (str): Type of change. + args (dict): A dictionary with format, type, last_modified. + """ + if event == "metadata" and self._last_modified < args["last_modified"]: + model = await self._file.load_content(self._file_format, self._file_type, True) + + self.log.info("Out-of-band changes. Overwriting the content in room %s", self._room_id) + + with self._flag: + self._document.source = model["content"] + self._last_modified = model["last_modified"] + self._document.dirty = False + + def _on_document_change(self, target: str, event: Any) -> None: + """ + Called when the shared document changes. + + Parameters: + target (str): The name of the changed attribute. + event (Any): Changes. + + ### Note: + We auto save the content of the document every time there is a + change in it. Since we could receive a high amount of changes + in a short period of time, we need create a task for saving the + document. This tasks are debounced (60 seconds by default) so we + need to cancel previous tasks before creating a new one. + """ + if self._flag.updating: + return + + if self._saving_document is not None and not self._saving_document.done(): + # the document is being saved, cancel that + self._saving_document.cancel() + self._saving_document = None + + self._saving_document = asyncio.create_task(self._maybe_save_document()) + + async def _maybe_save_document(self) -> None: + """ + Saves the content of the document to disk. + + ### Note: + There is a save delay to debounce the save since we could receive a high + amount of changes in a short period of time. This way we can cancel the + previous save. + """ + if self._save_delay is None: + return + + # save after X seconds of inactivity + await asyncio.sleep(self._save_delay) + + try: + self.log.info("Saving the content from room %s", self._room_id) + model = await self._file.save_content( + { + "format": self._file_format, + "type": self._file_type, + "last_modified": self._last_modified, + "content": self._document.source, + } + ) + self._last_modified = model["last_modified"] + with self._flag: + self._document.dirty = False + + except OutOfBandChanges: + self.log.info("Out-of-band changes. Overwriting the content in room %s", self._room_id) + model = await self._file.load_content(self._file_format, self._file_type, True) + with self._flag: + self._document.source = model["content"] + self._last_modified = model["last_modified"] + self._document.dirty = False + + +class TransientRoom(YRoom): + """A Y room for sharing state (e.g. awareness).""" + + def __init__(self, room_id: str, log: Logger | None): + super().__init__(log=log) + + self._room_id = room_id + + @property + def room_id(self) -> str: + """ + The room ID. + """ + return self._room_id diff --git a/jupyter_collaboration/stores.py b/jupyter_collaboration/stores.py new file mode 100644 index 00000000..323e06ed --- /dev/null +++ b/jupyter_collaboration/stores.py @@ -0,0 +1,32 @@ +# Copyright (c) Jupyter Development Team. +# Distributed under the terms of the Modified BSD License. + +from traitlets import Int, Unicode +from traitlets.config import LoggingConfigurable +from ypy_websocket.ystore import SQLiteYStore as _SQLiteYStore +from ypy_websocket.ystore import TempFileYStore as _TempFileYStore + + +class TempFileYStore(_TempFileYStore): + prefix_dir = "jupyter_ystore_" + + +class SQLiteYStoreMetaclass(type(LoggingConfigurable), type(_SQLiteYStore)): # type: ignore + pass + + +class SQLiteYStore(LoggingConfigurable, _SQLiteYStore, metaclass=SQLiteYStoreMetaclass): + db_path = Unicode( + ".jupyter_ystore.db", + config=True, + help="""The path to the YStore database. Defaults to '.jupyter_ystore.db' in the current + directory.""", + ) + + document_ttl = Int( + None, + allow_none=True, + config=True, + help="""The document time-to-live in seconds. Defaults to None (document history is never + cleared).""", + ) diff --git a/jupyter_collaboration/utils.py b/jupyter_collaboration/utils.py new file mode 100644 index 00000000..2dcaf529 --- /dev/null +++ b/jupyter_collaboration/utils.py @@ -0,0 +1,33 @@ +from typing import Tuple + + +def decode_file_path(path: str) -> Tuple[str, str, str]: + """ + Decodes a file path. The file path is composed by the format, + content type, and path or file id separated by ':'. + + Parameters: + path (str): File path. + + Returns: + components (Tuple[str, str, str]): A tuple with the format, + content type, and path or file id. + """ + format, file_type, file_id = path.split(":", 2) + return (format, file_type, file_id) + + +def encode_file_path(format: str, file_type: str, file_id: str) -> str: + """ + Encodes a file path. The file path is composed by the format, + content type, and path or file id separated by ':'. + + Parameters: + format (str): File format. + type (str): Content type. + path (str): Path or file id. + + Returns: + path (str): File path. + """ + return f"{format}:{file_type}:{file_id}" diff --git a/packages/docprovider/src/ydrive.ts b/packages/docprovider/src/ydrive.ts index 8fa6761a..09998ad1 100644 --- a/packages/docprovider/src/ydrive.ts +++ b/packages/docprovider/src/ydrive.ts @@ -1,10 +1,13 @@ // Copyright (c) Jupyter Development Team. // Distributed under the terms of the Modified BSD License. -import { DocumentChange, ISharedDocument, YDocument } from '@jupyter/ydoc'; +import { showDialog, Dialog } from '@jupyterlab/apputils'; import { URLExt } from '@jupyterlab/coreutils'; import { TranslationBundle } from '@jupyterlab/translation'; import { Contents, Drive, User } from '@jupyterlab/services'; + +import { DocumentChange, ISharedDocument, YDocument } from '@jupyter/ydoc'; + import { WebSocketProvider } from './yprovider'; import { ICollaborativeDrive, @@ -138,6 +141,23 @@ export class YDrive extends Drive implements ICollaborativeDrive { this._providers.delete(key); } }); + + for (const provider of this._providers.keys()) { + if (provider === key) { + continue; + } + const path = provider.split(':')[2]; + + if (options.path === path) { + showDialog({ + title: this._trans.__('Warning'), + body: this._trans.__( + 'Opening a document with multiple views simultaneously is not supported.Please, close one view otherwise, you might lose some of your changes.' + ), + buttons: [Dialog.okButton()] + }); + } + } } catch (error) { // Falling back to the contents API if opening the websocket failed // This may happen if the shared document is not a YDocument. diff --git a/pyproject.toml b/pyproject.toml index c6782200..dc80d702 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -141,7 +141,7 @@ exclude=[ "^binder/jupyter_config\\.py$", ] check_untyped_defs = true -disallow_any_generics = true +disallow_any_generics = false disallow_incomplete_defs = true disallow_untyped_decorators = true no_implicit_optional = true @@ -153,7 +153,7 @@ strict_equality = true strict_optional = true warn_unused_configs = true warn_redundant_casts = true -warn_return_any = true +warn_return_any = false warn_unused_ignores = true ignore_missing_imports = true