Skip to content

Commit

Permalink
test: add tests for symlink creation
Browse files Browse the repository at this point in the history
I found it a bit hard to change the code here without tests, so I added some.
This also found a couple of bugs:

- `update_item_folder` was causing some weird behaviours with files in the
  rclone root, and also when `alternative_folder` was used. I can't see anywhere
  that's required, so I removed it.
- There were two different, conflicting ways of finding the media in the
  rclone path. I removed the more simplistic one.

I also added some type checks / empty value checks and marked private methods as
such.
  • Loading branch information
the-eversio authored and Gaisberg committed Oct 3, 2024
1 parent 0139ddd commit 667e884
Show file tree
Hide file tree
Showing 2 changed files with 247 additions and 26 deletions.
45 changes: 19 additions & 26 deletions src/program/symlink.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,9 @@ def validate(self):
if not library_path.is_absolute():
logger.error(f"library_path is not an absolute path: {library_path}")
return False
return self.create_initial_folders()
return self._create_initial_folders()

def create_initial_folders(self):
def _create_initial_folders(self):
"""Create the initial library folders."""
try:
self.library_path_movies = self.settings.library_path / "movies"
Expand Down Expand Up @@ -90,40 +90,39 @@ def create_initial_folders(self):

def run(self, item: Union[Movie, Show, Season, Episode]):
"""Check if the media item exists and create a symlink if it does"""
items = self.get_items_to_update(item)
if not self.should_submit(items):
items = self._get_items_to_update(item)
if not self._should_submit(items):
if item.symlinked_times == 5:
logger.debug(f"Soft resetting {item.log_string} because required files were not found")
item.reset(True)
yield item
next_attempt = self.calculate_next_attempt(item)
next_attempt = self._calculate_next_attempt(item)
logger.debug(f"Waiting for {item.log_string} to become available, next attempt in {round((next_attempt - datetime.now()).total_seconds())} seconds")
item.symlinked_times += 1
yield (item, next_attempt)
try:
for _item in items:
self.update_item_folder(_item)
self._symlink(_item)
logger.log("SYMLINKER", f"Symlinks created for {item.log_string}")
except Exception as e:
logger.error(f"Exception thrown when creating symlink for {item.log_string}: {e}")
yield item

def calculate_next_attempt(self, item: Union[Movie, Show, Season, Episode]) -> datetime:
def _calculate_next_attempt(self, item: Union[Movie, Show, Season, Episode]) -> datetime:
base_delay = timedelta(seconds=5)
next_attempt_delay = base_delay * (2 ** item.symlinked_times)
next_attempt_time = datetime.now() + next_attempt_delay
return next_attempt_time

def should_submit(self, items: Union[Movie, Show, Season, Episode]) -> bool:
def _should_submit(self, items: Union[Movie, Show, Season, Episode]) -> bool:
"""Check if the item should be submitted for symlink creation."""
random_item = random.choice(items)
if not get_item_path(random_item):
if not _get_item_path(random_item):
return False
else:
return True

def get_items_to_update(self, item: Union[Movie, Show, Season, Episode]) -> List[Union[Movie, Episode]]:
def _get_items_to_update(self, item: Union[Movie, Show, Season, Episode]) -> List[Union[Movie, Episode]]:
items = []
if item.type in ["episode", "movie"]:
items.append(item)
Expand All @@ -139,27 +138,19 @@ def get_items_to_update(self, item: Union[Movie, Show, Season, Episode]) -> List
items.append(episode)
return items

def update_item_folder(self, item: Union[Movie, Episode]):
path = get_item_path(item)
if path:
item.set("folder", path.parent.name)

def symlink(self, item: Union[Movie, Episode]) -> bool:
"""Create a symlink for the given media item if it does not already exist."""
return self._symlink(item)

def _symlink(self, item: Union[Movie, Episode]) -> bool:
"""Create a symlink for the given media item if it does not already exist."""
if not item:
logger.error("Invalid item sent to Symlinker: None")
return False

if item.file is None:
logger.error(f"Item file is None for {item.log_string}, cannot create symlink.")
logger.error(f"Invalid item sent to Symlinker: {item}")
return False

if not item.folder:
logger.error(f"Item folder is None for {item.log_string}, cannot create symlink.")
source = _get_item_path(item)
if not source:
logger.error(f"Could not find path for {item.log_string}, cannot create symlink.")
return False

filename = self._determine_file_name(item)
Expand All @@ -170,7 +161,6 @@ def _symlink(self, item: Union[Movie, Episode]) -> bool:
extension = os.path.splitext(item.file)[1][1:]
symlink_filename = f"{filename}.{extension}"
destination = self._create_item_folders(item, symlink_filename)
source = os.path.join(self.rclone_path, item.folder, item.file)

try:
if os.path.islink(destination):
Expand All @@ -189,7 +179,7 @@ def _symlink(self, item: Union[Movie, Episode]) -> bool:
logger.error(f"OS error when creating symlink for {item.log_string}: {e}")
return False

if os.readlink(destination) != source:
if Path(destination).readlink() != source:
logger.error(f"Symlink validation failed: {destination} does not point to {source} for {item.log_string}")
return False

Expand Down Expand Up @@ -243,7 +233,7 @@ def create_folder_path(base_path, *subfolders):

return os.path.join(destination_folder, filename.replace("/", "-"))

def _determine_file_name(self, item) -> str | None:
def _determine_file_name(self, item: Union[Movie, Episode]) -> str | None:
"""Determine the filename of the symlink."""
filename = None
if isinstance(item, Movie):
Expand Down Expand Up @@ -297,8 +287,11 @@ def _delete_symlink(item: Union[Movie, Show], item_path: Path) -> bool:
logger.error(f"Failed to delete symlink for {item.log_string}, error: {e}")
return False

def get_item_path(item: Union[Movie, Episode]) -> Optional[Path]:
def _get_item_path(item: Union[Movie, Episode]) -> Optional[Path]:
"""Quickly check if the file exists in the rclone path."""
if not item.file:
return None

rclone_path = Path(settings_manager.settings.symlink.rclone_path)
possible_folders = [item.folder, item.file, item.alternative_folder]

Expand Down
228 changes: 228 additions & 0 deletions src/tests/test_symlink_creation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,228 @@
from pathlib import Path
from datetime import datetime
from typing import Optional, Union

import pytest
import shutil
import copy
from pyfakefs.fake_filesystem_unittest import Patcher

from program.symlink import Symlinker
from program.media.item import Movie, Show, Season, Episode
from program.media.state import States
from program.settings.manager import settings_manager

from sqlalchemy import create_engine
from sqlalchemy.engine import URL
from sqlalchemy.orm import declarative_base, relationship, sessionmaker

from utils.logger import logger

logger.disable("program") # Suppress

Base = declarative_base()
url = URL.create(
drivername="postgresql",
username="coderpad",
host="/tmp/postgresql/socket",
database="coderpad",
)
engine = create_engine(url)
Session = sessionmaker(bind=engine)


@pytest.fixture(scope="module")
def db_session():
Base.metadata.create_all(engine)
session = Session()
yield session
session.rollback()
session.close()


@pytest.fixture(scope="module")
def movie():
movie = Movie({})
movie.title = "Riven"
movie.aired_at = datetime(2020, 1, 1)
movie.imdb_id = "tt18278776"
return movie


@pytest.fixture(scope="module")
def episode():
show = Show({})
show.title = "Big Art"
show.aired_at = datetime(2015, 1, 1)
show.imdb_id = "tt4667710"

season = Season({})
season.title = "Season 01"
season.parent = show
season.number = 1

episode = Episode({})
episode.title = "S01E06 Riven with Fire"
episode.parent = season
episode.number = 6
episode.imdb_id = "tt14496350"
return episode


class MockSettings:
def __init__(self, library_path, rclone_path):
self.force_refresh = False
self.symlink = type(
"symlink",
(),
{
"library_path": Path(library_path),
"rclone_path": Path(rclone_path),
"separate_anime_dirs": True,
},
)


@pytest.fixture
def symlinker(fs):
library_path = "/fake/library"
fs.create_dir(f"{library_path}")

rclone_path = "/fake/rclone"
fs.create_dir(f"{rclone_path}")

settings_manager.settings = MockSettings(library_path, rclone_path)
return Symlinker()


def test_valid_symlinker(symlinker):
assert symlinker.initialized, "Library should be initialized successfully."
assert symlinker.library_path_movies.exists()
assert symlinker.library_path_shows.exists()
assert symlinker.library_path_anime_movies.exists()
assert symlinker.library_path_anime_shows.exists()


def test_invalid_library_structure(fs):
valid_path = "/valid"
invalid_path = "/invalid"
fs.create_dir(invalid_path)

# Invalid library path
settings_manager.settings = MockSettings(invalid_path, valid_path)
library = Symlinker()
assert (
not library.initialized
), "Library should fail initialization with incorrect structure."

# invalid rclone path
settings_manager.settings = MockSettings(valid_path, invalid_path)
library = Symlinker()
assert (
not library.initialized
), "Library should fail initialization with incorrect structure."


def test_symlink_create_invalid_item(symlinker):
assert symlinker.symlink(None) is False
assert symlinker.symlink(Movie({})) is False


def test_symlink_movie(symlinker, movie, fs):
def symlink_path(movie: Movie) -> Path:
"""
Simplistic version of Symlinker._create_item_folders
"""
name = symlinker._determine_file_name(movie)
return symlinker.library_path_movies / name / (name + ".mkv")

def symlink_check(target: Path):
"""
Runs symlinker, confirms the movie's symlink is in the right place and points to the real path.
"""
# Create "real" file, run symlinker
fs.create_file(target)
assert symlinker._symlink(movie) is True

# Validate the symlink
assert Path(movie.symlink_path) == symlink_path(movie)
assert Path(movie.symlink_path).is_symlink()
assert Path(movie.symlink_path).readlink() == target

# cleanup
shutil.rmtree(symlinker.rclone_path) and symlinker.rclone_path.mkdir()
shutil.rmtree(
symlinker.library_path_movies
) and symlinker.library_path_movies.mkdir()

file = f"{movie.title}.mkv"

movie.folder, movie.alternative_folder, movie.file = (movie.title, "other", file)
symlink_check(symlinker.rclone_path / movie.title / file)
symlink_check(symlinker.rclone_path / "other" / file)
symlink_check(symlinker.rclone_path / file / file)
symlink_check(symlinker.rclone_path / file)

# files in the root
movie.folder, movie.alternative_folder, movie.file = (None, None, file)
symlink_check(symlinker.rclone_path / file)


def test_symlink_episode(symlinker, episode, fs):
season_name = "Season %02d" % episode.parent.number

def symlink_path(episode: Episode) -> Path:
"""
Simplistic version of Symlinker._create_item_folders
"""
show = episode.parent.parent
show_name = f"{show.title} ({show.aired_at.year}) {{imdb-{show.imdb_id}}}"
episode_name = symlinker._determine_file_name(episode)
return (
symlinker.library_path_shows
/ show_name
/ season_name
/ (episode_name + ".mkv")
)

def symlink_check(target: Path):
"""
Runs symlinker, confirms the episode's symlink is in the right place and points to the real path.
"""
# Create "real" file, run symlinker
fs.create_file(target)
assert symlinker._symlink(episode) is True

# Validate the symlink
assert Path(episode.symlink_path) == symlink_path(episode)
assert Path(episode.symlink_path).is_symlink()
assert Path(episode.symlink_path).readlink() == target

# cleanup
shutil.rmtree(symlinker.rclone_path) and symlinker.rclone_path.mkdir()
shutil.rmtree(
symlinker.library_path_shows
) and symlinker.library_path_shows.mkdir()

file = f"{episode.title}.mkv"

# Common namings
episode.folder, episode.alternative_folder, episode.file = (
episode.parent.parent.title,
"other",
file,
)
# symlink_check(symlinker.rclone_path / episode.parent.parent.title / season_name / file) # Not supported
symlink_check(symlinker.rclone_path / episode.parent.parent.title / file)
# symlink_check(symlinker.rclone_path / "other" / file)
symlink_check(symlinker.rclone_path / file / file)
symlink_check(symlinker.rclone_path / file)

# Somewhat less common: Show Name - Season 01 / file
name = str(episode.parent.parent.title + season_name)
episode.folder, episode.alternative_folder, episode.file = (name, None, file)
symlink_check(symlinker.rclone_path / name / file)

# Files in the rclone root
episode.folder, episode.alternative_folder, episode.file = (None, None, file)
symlink_check(symlinker.rclone_path / file)

0 comments on commit 667e884

Please sign in to comment.