Skip to content

Commit

Permalink
Fixes sync between rooms
Browse files Browse the repository at this point in the history
  • Loading branch information
hbcarlos committed Apr 7, 2023
1 parent 78c340c commit 1ee3848
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 41 deletions.
4 changes: 2 additions & 2 deletions jupyter_collaboration/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,15 @@ class YDocExtension(ExtensionApp):
name = "jupyter_collaboration"

file_poll_interval = Int(
1,
5,
config=True,
help="""The period in seconds to check for file changes on disk.
Defaults to 1s, if 0 then file changes will only be checked when
saving changes from the front-end.""",
)

document_cleanup_delay = Int(
60,
5,
allow_none=True,
config=True,
help="""The delay in seconds to keep a document in memory in the back-end after all clients
Expand Down
17 changes: 11 additions & 6 deletions jupyter_collaboration/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import asyncio
import json
import uuid
from logging import getLogger
from pathlib import Path
from typing import Any, Dict, Optional, Set

Expand Down Expand Up @@ -369,16 +370,20 @@ def clean_up(cls):
Useful to clean up tasks on server shut down.
"""
assert cls.websocket_server is not None
# Cancel tasks and clean up
# TODO: should we wait for any save task?
rooms = list(cls.websocket_server.rooms.values())
for room in rooms:
cls.websocket_server.delete_room(room=room)
log = getLogger(__name__)
log.info("Cleaning up resources before server shut down.")
if cls.websocket_server is not None:
# Cancel tasks and clean up
# TODO: should we wait for any save task?
rooms = list(cls.websocket_server.rooms.values())
log.info("Deleting rooms.")
for room in rooms:
cls.websocket_server.delete_room(room=room)

for file in cls.files.values():
file.clean()

log.info("Deleting files.")
cls.files.clear()


Expand Down
42 changes: 19 additions & 23 deletions jupyter_collaboration/loaders.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@
from jupyter_server.utils import ensure_async


class OutOfBandChanges(Exception):
pass


class FileLoader:
"""
A class to centralize all the operation on a file.
Expand All @@ -25,15 +29,16 @@ def __init__(
self._file_id: str = file_id
self._file_format: str = file_format
self._file_type: str = file_type
self._last_modified = None

self._lock = asyncio.Lock()
self._poll_interval = poll_interval
self._file_id_manager = file_id_manager
self._contents_manager = contents_manager

self._log = log or getLogger(__name__)
self._subscriptions: dict[str, Callable[[str], Coroutine[Any, Any, None]]] = {}
self._subscriptions: dict[
str, Callable[[str, dict[str, Any]], Coroutine[Any, Any, None]]
] = {}

self._watcher = asyncio.create_task(self._watch_file()) if self._poll_interval else None

Expand All @@ -60,7 +65,9 @@ def clean(self) -> None:
if self._watcher is not None:
self._watcher.cancel()

def observe(self, id: str, callback: Callable[[str], Coroutine[Any, Any, None]]) -> None:
def observe(
self, id: str, callback: Callable[[str, dict[str, Any]], Coroutine[Any, Any, None]]
) -> None:
"""
Subscribe to the file to get notified on file changes.
Expand Down Expand Up @@ -98,12 +105,12 @@ async def load_content(self, format: str, file_type: str, content: bool) -> dict
)
)

async def save_content(self, model: dict[str, Any]) -> None:
async def save_content(self, model: dict[str, Any]) -> dict[str, Any]:
"""
Save the content of the file.
Parameters:
model (dict): A dictionary with format, type, and content of the file.
model (dict): A dictionary with format, type, last_modified, and content of the file.
"""
async with self._lock:
path = self.path
Expand All @@ -113,20 +120,13 @@ async def save_content(self, model: dict[str, Any]) -> None:
)
)

if self._last_modified is None or self._last_modified == m["last_modified"]:
if model["last_modified"] == m["last_modified"]:
self._log.info("Saving file: %s", path)
model = await ensure_async(self._contents_manager.save(model, path))
self._last_modified = model["last_modified"]
return await ensure_async(self._contents_manager.save(model, path))

else:
# file changed on disk, let's revert
self._log.info(
"Notifying rooms. Out-of-band changes while trying to save: %s", path
)
self._last_modified = model["last_modified"]
# Notify that the content changed on disk
for callback in self._subscriptions.values():
await callback("changed")
# file changed on disk, raise an error
raise OutOfBandChanges

async def _watch_file(self) -> None:
"""
Expand All @@ -153,10 +153,6 @@ async def _maybe_load_document(self) -> None:
)
)

# do nothing if the file was saved by us
if self._last_modified is not None and self._last_modified < model["last_modified"]:
self._log.info("Notifying rooms. The file on disk changed: %s", path)
self._last_modified = model["last_modified"]
# Notify that the content changed on disk
for callback in self._subscriptions.values():
await callback("changed")
# Notify that the content changed on disk
for callback in self._subscriptions.values():
await callback("metadata", model)
47 changes: 37 additions & 10 deletions jupyter_collaboration/rooms.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from ypy_websocket.websocket_server import YRoom
from ypy_websocket.ystore import BaseYStore, YDocNotFound

from .loaders import FileLoader
from .loaders import FileLoader, OutOfBandChanges

YFILE = YDOCS["file"]

Expand All @@ -31,6 +31,7 @@ def __init__(
self._room_id: str = room_id
self._file_format: str = file_format
self._file_type: str = file_type
self._last_modified: str = None
self._file: FileLoader = file
self._document = YDOCS.get(self._file_type, YFILE)(self.ydoc)

Expand Down Expand Up @@ -121,6 +122,7 @@ async def initialize(self) -> None:
if self.ystore:
await self.ystore.encode_state_as_update(self.ydoc)

self._last_modified = model["last_modified"]
self._document.dirty = False
self.ready = True

Expand All @@ -137,18 +139,28 @@ def _clean(self) -> None:
self._document.unobserve()
self._file.unobserve(self.room_id)

async def _on_content_change(self, event: str) -> None:
async def _on_content_change(self, event: str, args: dict[str, Any]) -> None:
"""
Called when the file changes.
Parameters:
event (str): Type of change.
"""
if event == "changed":
self.log.info("Overwriting the content in room %s", self._room_id)
if event == "metadata" and self._last_modified < args["last_modified"]:
model = await self._file.load_content(self._file_format, self._file_type, True)
self._document.source = model["content"]
self._document.dirty = False

if self._document.source != model["content"]:
self.log.info(
"Out-of-band changes. Overwriting the content in room %s", self._room_id
)
self._document.source = model["content"]
self._last_modified = model["last_modified"]
self._document.dirty = False

else:
# Update last_modify because this attribute changed on disk
# even though the content did not.
self._last_modified = model["last_modified"]

def _on_document_change(self, target: str, event: Any) -> None:
"""
Expand Down Expand Up @@ -193,10 +205,25 @@ async def _maybe_save_document(self) -> None:
# save after X seconds of inactivity
await asyncio.sleep(self._save_delay)

await self._file.save_content(
{"format": self._file_format, "type": self._file_type, "content": self._document.source}
)
self._document.dirty = False
try:
self.log.info("Saving the content from room %s", self._room_id)
model = await self._file.save_content(
{
"format": self._file_format,
"type": self._file_type,
"last_modified": self._last_modified,
"content": self._document.source,
}
)
self._last_modified = model["last_modified"]
self._document.dirty = False

except OutOfBandChanges:
self.log.info("Out-of-band changes. Overwriting the content in room %s", self._room_id)
model = await self._file.load_content(self._file_format, self._file_type, True)
self._document.source = model["content"]
self._last_modified = model["last_modified"]
self._document.dirty = False


class TransientRoom(YRoom):
Expand Down

0 comments on commit 1ee3848

Please sign in to comment.