Skip to content

Commit

Permalink
fix: address memory usage (#787)
Browse files Browse the repository at this point in the history
* feat: start refactor on id handling

* fix: more fixes towards handling ids

* fix: more work done

* fix: improved retry lib for logging

* fix: update deps and add psutil

* fix: more work on getting new content added

* fix: fix retry/reset endpoints

* fix: use event instead of add_item on retry endpoint

* fix: issue with duplicate symlink detection on db init

* fix: more work.. need to

* fix: fixed session management when processing threads

* fix: updated add_item

* fix: fix event processing with item_id handling

* fix: attempt #1 at duplicates

* fix: attempt #2. use same removal process as remove endpoint

* fix: check type first and delete duplicate shows appropriately

* fix: repurpose deletions to work on any item

* fix: revert duplicate changes. change retry lib back to normal, set to 24h

* refactor: rename private methods to public, remove filtering items in content services

* chore: rename store_item and add prefixes to threads to indentify

* chore: Events to hold service, not service name, fixes multiple notifications

* fix: stopiter error fix

* fix: add logging...

* fix: whoops

* feat: start refactor on id handling

* fix: more fixes towards handling ids

* fix: more work done

* fix: improved retry lib for logging

* fix: update deps and add psutil

* fix: more work on getting new content added

* fix: fix retry/reset endpoints

* fix: use event instead of add_item on retry endpoint

* fix: issue with duplicate symlink detection on db init

* fix: more work.. need to

* fix: fixed session management when processing threads

* fix: updated add_item

* fix: fix event processing with item_id handling

* fix: attempt #1 at duplicates

* fix: attempt #2. use same removal process as remove endpoint

* fix: check type first and delete duplicate shows appropriately

* fix: repurpose deletions to work on any item

* fix: revert duplicate changes. change retry lib back to normal, set to 24h

* refactor: rename private methods to public, remove filtering items in content services

* chore: rename store_item and add prefixes to threads to indentify

* chore: Events to hold service, not service name, fixes multiple notifications

* fix: stopiter error fix

* fix: add logging...

* fix: whoops

---------

Co-authored-by: davidemarcoli <davide@marcoli.ch>
Co-authored-by: Gaisberg <None>
  • Loading branch information
dreulavelle and davidemarcoli authored Oct 18, 2024
1 parent a4e0793 commit 612964e
Show file tree
Hide file tree
Showing 21 changed files with 938 additions and 1,073 deletions.
1,050 changes: 541 additions & 509 deletions poetry.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ subliminal = "^2.2.1"
rank-torrent-name = "^1.0.2"
jsonschema = "^4.23.0"
scalar-fastapi = "^1.0.3"
psutil = "^6.0.0"

[tool.poetry.group.dev.dependencies]
pyright = "^1.1.352"
Expand Down
34 changes: 18 additions & 16 deletions src/controllers/items.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ async def get_items_by_imdb_ids(request: Request, imdb_ids: str) -> list[dict]:

class ResetResponse(BaseModel):
message: str
ids: list[str]
ids: list[int]


@router.post(
Expand All @@ -284,7 +284,7 @@ async def reset_items(request: Request, ids: str) -> ResetResponse:
media_items_generator = get_media_items_by_ids(ids)
for media_item in media_items_generator:
try:
request.app.program.em.cancel_job(media_item)
request.app.program.em.cancel_job(media_item._id)
clear_streams(media_item)
reset_media_item(media_item)
except ValueError as e:
Expand All @@ -300,7 +300,7 @@ async def reset_items(request: Request, ids: str) -> ResetResponse:

class RetryResponse(BaseModel):
message: str
ids: list[str]
ids: list[int]


@router.post(
Expand All @@ -310,13 +310,15 @@ class RetryResponse(BaseModel):
operation_id="retry_items",
)
async def retry_items(request: Request, ids: str) -> RetryResponse:
"""Re-add items to the queue"""
ids = handle_ids(ids)
try:
media_items_generator = get_media_items_by_ids(ids)
for media_item in media_items_generator:
request.app.program.em.cancel_job(media_item)
request.app.program.em.cancel_job(media_item._id)
await asyncio.sleep(0.1) # Ensure cancellation is processed
request.app.program.em.add_item(media_item)
# request.app.program.em.add_item(media_item)
request.app.program.em.add_event(Event("RetryItem", media_item._id))
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))

Expand All @@ -340,26 +342,26 @@ async def remove_item(request: Request, ids: str) -> RemoveResponse:
media_items: list[int] = get_parent_ids(ids)
if not media_items:
return HTTPException(status_code=404, detail="Item(s) not found")

for media_item in media_items:
logger.debug(f"Removing item with ID {media_item}")
request.app.program.em.cancel_job_by_id(media_item)
for item_id in media_items:
logger.debug(f"Removing item with ID {item_id}")
request.app.program.em.cancel_job(item_id)
await asyncio.sleep(0.2) # Ensure cancellation is processed
clear_streams_by_id(media_item)
clear_streams_by_id(item_id)

symlink_service = request.app.program.services.get(Symlinker)
if symlink_service:
symlink_service.delete_item_symlinks_by_id(media_item)
symlink_service.delete_item_symlinks_by_id(item_id)

with db.Session() as session:
requested_id = session.execute(select(MediaItem.requested_id).where(MediaItem._id == media_item)).scalar_one()
requested_id = session.execute(select(MediaItem.requested_id).where(MediaItem._id == item_id)).scalar_one()
if requested_id:
logger.debug(f"Deleting request from Overseerr with ID {requested_id}")
Overseerr.delete_request(requested_id)

logger.debug(f"Deleting item from database with ID {media_item}")
delete_media_item_by_id(media_item)
logger.info(f"Successfully removed item with ID {media_item}")
logger.debug(f"Deleting item from database with ID {item_id}")
delete_media_item_by_id(item_id)
logger.info(f"Successfully removed item with ID {item_id}")
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))

Expand Down Expand Up @@ -489,7 +491,7 @@ def set_torrent_rd(request: Request, id: int, torrent_id: str) -> SetTorrentRDRe

session.commit()

request.app.program.em.add_event(Event("Symlinker", item))
request.app.program.em.add_event(Event("Symlinker", item._id))

return {
"success": True,
Expand Down
18 changes: 4 additions & 14 deletions src/controllers/webhooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import pydantic
from fastapi import APIRouter, Request
from program.content.overseerr import Overseerr
from program.db.db_functions import _ensure_item_exists_in_db
from program.db.db_functions import ensure_item_exists_in_db
from program.indexers.trakt import get_imdbid_from_tmdb, get_imdbid_from_tvdb
from program.media.item import MediaItem
from requests import RequestException
Expand Down Expand Up @@ -40,22 +40,12 @@ async def overseerr(request: Request) -> Dict[str, Any]:
logger.error("Overseerr not initialized")
return {"success": False, "message": "Overseerr not initialized"}

try:
new_item = MediaItem({"imdb_id": imdb_id, "requested_by": "overseerr", "requested_id": req.request.request_id})
except Exception as e:
logger.error(f"Failed to create item for {imdb_id}: {e}")
return {"success": False, "message": str(e)}

if _ensure_item_exists_in_db(new_item) or imdb_id in overseerr.recurring_items:
new_item = MediaItem({"imdb_id": imdb_id, "requested_by": "overseerr", "requested_id": req.request.request_id})
if ensure_item_exists_in_db(new_item):
logger.log("API", "Request already in queue or already exists in the database")
return {"success": True}
else:
overseerr.recurring_items.add(imdb_id)

try:
request.app.program.em.add_item(new_item)
except Exception as e:
logger.error(f"Failed to add item for {imdb_id}: {e}")
request.app.program.em.add_item(new_item, service="Overseerr")

return {"success": True}

Expand Down
27 changes: 26 additions & 1 deletion src/program/content/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,32 @@
# from typing import Generator
# from program.media.item import MediaItem

from .listrr import Listrr
from .mdblist import Mdblist
from .overseerr import Overseerr
from .plex_watchlist import PlexWatchlist
from .trakt import TraktContent

__all__ = ["Listrr", "Mdblist", "Overseerr", "PlexWatchlist", "TraktContent"]
__all__ = ["Listrr", "Mdblist", "Overseerr", "PlexWatchlist", "TraktContent"]

# class Requester:
# def __init__(self):
# self.key = "content"
# self.initialized = False
# self.services = {
# Listrr: Listrr(),
# Mdblist: Mdblist(),
# Overseerr: Overseerr(),
# PlexWatchlist: PlexWatchlist(),
# TraktContent: TraktContent()
# }
# self.initialized = self.validate()
# if not self.initialized:
# return

# def validate(self):
# return any(service.initialized for service in self.services.values())

# def run(self, item: MediaItem) -> Generator[MediaItem, None, None]:
# """Index newly requested items."""
# yield item
12 changes: 2 additions & 10 deletions src/program/content/listrr.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

from requests.exceptions import HTTPError

from program.db.db_functions import _filter_existing_items
from program.indexers.trakt import get_imdbid_from_tmdb
from program.media.item import MediaItem
from program.settings.manager import settings_manager
Expand All @@ -22,7 +21,6 @@ def __init__(self):
self.initialized = self.validate()
if not self.initialized:
return
self.recurring_items: set[str] = set()
logger.success("Listrr initialized!")

def validate(self) -> bool:
Expand Down Expand Up @@ -67,14 +65,8 @@ def run(self) -> Generator[MediaItem, None, None]:
return

listrr_items = movie_items + show_items
non_existing_items = _filter_existing_items(listrr_items)
new_non_recurring_items = [item for item in non_existing_items if item.imdb_id not in self.recurring_items]
self.recurring_items.update([item.imdb_id for item in new_non_recurring_items])

if new_non_recurring_items:
logger.info(f"Fetched {len(new_non_recurring_items)} new items from Listrr")

yield new_non_recurring_items
logger.info(f"Fetched {len(listrr_items)} items from Listrr")
yield listrr_items

def _get_items_from_Listrr(self, content_type, content_lists) -> list[MediaItem]: # noqa: C901, PLR0912
"""Fetch unique IMDb IDs from Listrr for a given type and list of content."""
Expand Down
11 changes: 2 additions & 9 deletions src/program/content/mdblist.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

from typing import Generator

from program.db.db_functions import _filter_existing_items
from program.media.item import MediaItem
from program.settings.manager import settings_manager
from utils.logger import logger
Expand All @@ -21,7 +20,6 @@ def __init__(self):
return
self.requests_per_2_minutes = self._calculate_request_time()
self.rate_limiter = RateLimiter(self.requests_per_2_minutes, 120, True)
self.recurring_items = set()
logger.success("mdblist initialized")

def validate(self):
Expand Down Expand Up @@ -63,13 +61,8 @@ def run(self) -> Generator[MediaItem, None, None]:
except RateLimitExceeded:
pass

non_existing_items = _filter_existing_items(items_to_yield)
new_non_recurring_items = [item for item in non_existing_items if item.imdb_id not in self.recurring_items and isinstance(item, MediaItem)]
self.recurring_items.update([item.imdb_id for item in new_non_recurring_items])

if new_non_recurring_items:
logger.info(f"Found {len(new_non_recurring_items)} new items to fetch")
yield new_non_recurring_items
logger.info(f"Fetched {len(items_to_yield)} items from mdblist.com")
yield items_to_yield

def _calculate_request_time(self):
limits = my_limits(self.settings.api_key).limits
Expand Down
10 changes: 2 additions & 8 deletions src/program/content/overseerr.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
from requests.exceptions import ConnectionError, RetryError
from urllib3.exceptions import MaxRetryError, NewConnectionError

from program.db.db_functions import _filter_existing_items
from program.indexers.trakt import get_imdbid_from_tmdb
from program.media.item import MediaItem
from program.settings.manager import settings_manager
Expand All @@ -24,7 +23,6 @@ def __init__(self):
self.run_once = False
if not self.initialized:
return
self.recurring_items: set[str] = set()
logger.success("Overseerr initialized!")

def validate(self) -> bool:
Expand Down Expand Up @@ -58,18 +56,14 @@ def run(self):
return

overseerr_items: list[MediaItem] = self.get_media_requests()
non_existing_items = _filter_existing_items(overseerr_items)
new_non_recurring_items = [item for item in non_existing_items if item.imdb_id not in self.recurring_items and isinstance(item, MediaItem)]
self.recurring_items.update([item.imdb_id for item in new_non_recurring_items])

if self.settings.use_webhook:
logger.debug("Webhook is enabled. Running Overseerr once before switching to webhook only mode")
self.run_once = True

if new_non_recurring_items:
logger.info(f"Fetched {len(new_non_recurring_items)} new items from Overseerr")
logger.info(f"Fetched {len(overseerr_items)} items from overseerr")

yield new_non_recurring_items
yield overseerr_items

def get_media_requests(self) -> list[MediaItem]:
"""Get media requests from `Overseerr`"""
Expand Down
10 changes: 2 additions & 8 deletions src/program/content/plex_watchlist.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
from requests import HTTPError, Session
from urllib3 import HTTPConnectionPool

from program.db.db_functions import _filter_existing_items
from program.media.item import Episode, MediaItem, Movie, Season, Show
from program.settings.manager import settings_manager
from utils.logger import logger
Expand All @@ -25,7 +24,6 @@ def __init__(self):
self.initialized = self.validate()
if not self.initialized:
return
self.recurring_items: set[str] = set() # set of imdb ids
logger.success("Plex Watchlist initialized!")

def validate(self):
Expand Down Expand Up @@ -70,14 +68,10 @@ def run(self) -> Generator[MediaItem, None, None]:

plex_items: set[str] = set(watchlist_items) | set(rss_items)
items_to_yield: list[MediaItem] = [MediaItem({"imdb_id": imdb_id, "requested_by": self.key}) for imdb_id in plex_items if imdb_id and imdb_id.startswith("tt")]
non_existing_items = _filter_existing_items(items_to_yield)
new_non_recurring_items = [item for item in non_existing_items if item.imdb_id not in self.recurring_items and isinstance(item, MediaItem)]
self.recurring_items.update([item.imdb_id for item in new_non_recurring_items])

if new_non_recurring_items:
logger.info(f"Found {len(new_non_recurring_items)} new items to fetch")
logger.info(f"Fetched {len(items_to_yield)} items from plex watchlist")
yield items_to_yield

yield new_non_recurring_items

def _get_items_from_rss(self) -> list[str]:
"""Fetch media from Plex RSS Feeds."""
Expand Down
18 changes: 2 additions & 16 deletions src/program/content/trakt.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

from requests import RequestException

from program.db.db_functions import _filter_existing_items
from program.media.item import MediaItem
from program.settings.manager import settings_manager
from utils.logger import logger
Expand All @@ -30,8 +29,6 @@ def __init__(self):
if not self.initialized:
return
self.next_run_time = 0
self.recurring_items = set()
self.items_already_seen = set()
self.missing()
logger.success("Trakt initialized!")

Expand Down Expand Up @@ -93,19 +90,8 @@ def run(self):
if not items_to_yield:
return

non_existing_items = _filter_existing_items(items_to_yield)
new_non_recurring_items = [
item
for item in non_existing_items
if item.imdb_id not in self.recurring_items
and isinstance(item, MediaItem)
]
self.recurring_items.update(item.imdb_id for item in new_non_recurring_items)

if new_non_recurring_items:
logger.log("TRAKT", f"Found {len(new_non_recurring_items)} new items to fetch")

yield new_non_recurring_items
logger.info(f"Fetched {len(items_to_yield)} items from trakt")
yield items_to_yield

def _get_watchlist(self, watchlist_users: list) -> list:
"""Get IMDb IDs from Trakt watchlist"""
Expand Down
2 changes: 1 addition & 1 deletion src/program/db/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from utils import data_dir_path
from utils.logger import logger

engine_options={
engine_options = {
"pool_size": 25, # Prom: Set to 1 when debugging sql queries
"max_overflow": 25, # Prom: Set to 0 when debugging sql queries
"pool_pre_ping": True, # Prom: Set to False when debugging sql queries
Expand Down
Loading

0 comments on commit 612964e

Please sign in to comment.