From 31677a275916c043a0135dd2adec97884047cc4a Mon Sep 17 00:00:00 2001 From: Arrrrr Date: Thu, 21 Nov 2024 04:36:44 -0800 Subject: [PATCH 1/4] feat: pause able media items --- ...00_c99239e3445f_add_pause_functionality.py | 40 ++++ src/program/media/item.py | 50 +++- src/program/state_transition.py | 69 +----- src/routers/secure/items.py | 225 +++++++++++++++--- 4 files changed, 288 insertions(+), 96 deletions(-) create mode 100644 src/alembic/versions/20241114_1400_c99239e3445f_add_pause_functionality.py diff --git a/src/alembic/versions/20241114_1400_c99239e3445f_add_pause_functionality.py b/src/alembic/versions/20241114_1400_c99239e3445f_add_pause_functionality.py new file mode 100644 index 00000000..40c67b46 --- /dev/null +++ b/src/alembic/versions/20241114_1400_c99239e3445f_add_pause_functionality.py @@ -0,0 +1,40 @@ +"""add_pause_functionality + +Revision ID: c99239e3445f +revision: str = 'c99239e3445f' +Revises: c99709e3648f +Create Date: 2024-11-14 16:00:00.000000 + +""" +from typing import Sequence, Union + +import sqlalchemy as sa + +from alembic import op + +# revision identifiers, used by Alembic. +revision: str = '[generate a new revision ID]' +down_revision: Union[str, None] = 'c99709e3648f' +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + # Add pause-related columns to MediaItem table + op.add_column('MediaItem', + sa.Column('is_paused', sa.Boolean(), nullable=True, default=False)) + op.add_column('MediaItem', + sa.Column('paused_at', sa.DateTime(), nullable=True)) + op.add_column('MediaItem', + sa.Column('paused_by', sa.String(), nullable=True)) + + # Add index for is_paused column + op.create_index(op.f('ix_mediaitem_is_paused'), 'MediaItem', ['is_paused']) + + +def downgrade() -> None: + # Remove pause-related columns from MediaItem table + op.drop_index(op.f('ix_mediaitem_is_paused'), table_name='MediaItem') + op.drop_column('MediaItem', 'paused_by') + op.drop_column('MediaItem', 'paused_at') + op.drop_column('MediaItem', 'is_paused') diff --git a/src/program/media/item.py b/src/program/media/item.py index 5b081963..07a623a8 100644 --- a/src/program/media/item.py +++ b/src/program/media/item.py @@ -77,6 +77,10 @@ class MediaItem(db.Model): last_state: Mapped[Optional[States]] = mapped_column(sqlalchemy.Enum(States), default=States.Unknown) subtitles: Mapped[list[Subtitle]] = relationship(Subtitle, back_populates="parent", lazy="selectin", cascade="all, delete-orphan") + # Pause related fields + is_paused: Mapped[Optional[bool]] = mapped_column(sqlalchemy.Boolean, default=False) + paused_at: Mapped[Optional[datetime]] = mapped_column(sqlalchemy.DateTime, nullable=True) + __mapper_args__ = { "polymorphic_identity": "mediaitem", "polymorphic_on":"type", @@ -86,6 +90,7 @@ class MediaItem(db.Model): __table_args__ = ( Index("ix_mediaitem_type", "type"), Index("ix_mediaitem_requested_by", "requested_by"), + Index("ix_mediaitem_is_paused", "is_paused"), Index("ix_mediaitem_title", "title"), Index("ix_mediaitem_imdb_id", "imdb_id"), Index("ix_mediaitem_tvdb_id", "tvdb_id"), @@ -257,6 +262,8 @@ def to_dict(self): "requested_by": self.requested_by, "scraped_at": str(self.scraped_at), "scraped_times": self.scraped_times, + "is_paused": self.is_paused, + "paused_at": str(self.paused_at) if self.paused_at else None, } def to_extended_dict(self, abbreviated_children=False, with_streams=True): @@ -407,6 +414,47 @@ def _reset(self): def log_string(self): return self.title or self.id + def pause(self) -> None: + """Pause processing of this media item""" + if self.is_paused: + logger.debug(f"{self.log_string} is already paused") + return + + logger.debug(f"Pausing {self.id}") + try: + self.is_paused = True + self.paused_at = datetime.now() + + session = object_session(self) + if session: + session.flush() + + logger.info(f"{self.log_string} paused, is_paused={self.is_paused}, paused_at={self.paused_at}") + except Exception as e: + logger.error(f"Failed to pause {self.log_string}: {str(e)}") + raise + + def unpause(self) -> None: + """Resume processing of this media item""" + + if not self.is_paused: + logger.debug(f"{self.log_string} is not paused") + return + + logger.debug(f"Unpausing {self.id}") + try: + self.is_paused = False + self.paused_at = None + + session = object_session(self) + if session: + session.flush() + + logger.info(f"{self.log_string} unpaused, is_paused={self.is_paused}, paused_at={self.paused_at}") + except Exception as e: + logger.error(f"Failed to unpause {self.log_string}: {str(e)}") + raise + @property def collection(self): return self.parent.collection if self.parent else self.id @@ -730,4 +778,4 @@ def copy_item(item): elif isinstance(item, MediaItem): return MediaItem(item={}).copy(item) else: - raise ValueError(f"Cannot copy item of type {type(item)}") \ No newline at end of file + raise ValueError(f"Cannot copy item of type {type(item)}") diff --git a/src/program/state_transition.py b/src/program/state_transition.py index 9ecec854..621742cc 100644 --- a/src/program/state_transition.py +++ b/src/program/state_transition.py @@ -17,67 +17,10 @@ def process_event(emitted_by: Service, existing_item: MediaItem | None = None, c no_further_processing: ProcessedEvent = (None, []) items_to_submit = [] -#TODO - Reindex non-released badly indexed items here - if content_item or (existing_item is not None and existing_item.last_state == States.Requested): - next_service = TraktIndexer - logger.debug(f"Submitting {content_item.log_string if content_item else existing_item.log_string} to trakt indexer") - return next_service, [content_item or existing_item] - - elif existing_item is not None and existing_item.last_state in [States.PartiallyCompleted, States.Ongoing]: - if existing_item.type == "show": - for season in existing_item.seasons: - if season.last_state not in [States.Completed, States.Unreleased]: - _, sub_items = process_event(emitted_by, season, None) - items_to_submit += sub_items - elif existing_item.type == "season": - for episode in existing_item.episodes: - if episode.last_state != States.Completed: - _, sub_items = process_event(emitted_by, episode, None) - items_to_submit += sub_items - - elif existing_item is not None and existing_item.last_state == States.Indexed: - next_service = Scraping - if emitted_by != Scraping and Scraping.should_submit(existing_item): - items_to_submit = [existing_item] - elif existing_item.type == "show": - items_to_submit = [s for s in existing_item.seasons if s.last_state != States.Completed and Scraping.should_submit(s)] - elif existing_item.type == "season": - items_to_submit = [e for e in existing_item.episodes if e.last_state != States.Completed and Scraping.should_submit(e)] - - elif existing_item is not None and existing_item.last_state == States.Scraped: - next_service = Downloader - items_to_submit = [existing_item] - - elif existing_item is not None and existing_item.last_state == States.Downloaded: - next_service = Symlinker - items_to_submit = [existing_item] - - elif existing_item is not None and existing_item.last_state == States.Symlinked: - next_service = Updater - items_to_submit = [existing_item] - - elif existing_item is not None and existing_item.last_state == States.Completed: - # If a user manually retries an item, lets not notify them again - if emitted_by not in ["RetryItem", PostProcessing]: - notify(existing_item) - # Avoid multiple post-processing runs - if emitted_by != PostProcessing: - if settings_manager.settings.post_processing.subliminal.enabled: - next_service = PostProcessing - if existing_item.type in ["movie", "episode"] and Subliminal.should_submit(existing_item): - items_to_submit = [existing_item] - elif existing_item.type == "show": - items_to_submit = [e for s in existing_item.seasons for e in s.episodes if e.last_state == States.Completed and Subliminal.should_submit(e)] - elif existing_item.type == "season": - items_to_submit = [e for e in existing_item.episodes if e.last_state == States.Completed and Subliminal.should_submit(e)] - if not items_to_submit: - return no_further_processing - else: - - return no_further_processing - - # if items_to_submit and next_service: - # for item in items_to_submit: - # logger.debug(f"Submitting {item.log_string} ({item.id}) to {next_service if isinstance(next_service, str) else next_service.__name__}") - + # Skip processing if item is paused + if existing_item and existing_item.is_paused: + logger.debug(f"Skipping {existing_item.log_string} - item is paused") + return no_further_processing + + #not sure if i need to remove this return next_service, items_to_submit diff --git a/src/routers/secure/items.py b/src/routers/secure/items.py index da7ead7b..a200f2ff 100644 --- a/src/routers/secure/items.py +++ b/src/routers/secure/items.py @@ -12,7 +12,7 @@ from program.db import db_functions from program.db.db import db, get_db -from program.media.item import MediaItem +from program.media.item import MediaItem, MediaType # Import MediaType from item module from program.media.state import States from program.services.content import Overseerr from program.symlink import Symlinker @@ -20,10 +20,50 @@ from ..models.shared import MessageResponse +class StateResponse(BaseModel): + success: bool + states: list[str] + +class ItemsResponse(BaseModel): + success: bool + items: list[dict] + page: int + limit: int + total_items: int + total_pages: int + +class ResetResponse(BaseModel): + message: str + ids: list[str] + +class RetryResponse(BaseModel): + message: str + ids: list[str] + +class RemoveResponse(BaseModel): + message: str + ids: list[str] + +class PauseResponse(BaseModel): + """Response model for pause/unpause operations""" + message: str + ids: list[str] + +class PauseStateResponse(BaseModel): + """Response model for pause state check""" + is_paused: bool + paused_at: Optional[str] + item_id: str + title: Optional[str] + +class AllPausedResponse(BaseModel): + """Response model for getting all paused items""" + count: int + items: list[dict] + router = APIRouter( prefix="/items", tags=["items"], - responses={404: {"description": "Not found"}}, ) @@ -34,11 +74,6 @@ def handle_ids(ids: str) -> list[str]: return ids -class StateResponse(BaseModel): - success: bool - states: list[str] - - @router.get("/states", operation_id="get_states") async def get_states() -> StateResponse: return { @@ -47,15 +82,6 @@ async def get_states() -> StateResponse: } -class ItemsResponse(BaseModel): - success: bool - items: list[dict] - page: int - limit: int - total_items: int - total_pages: int - - @router.get( "", summary="Retrieve Media Items", @@ -254,11 +280,6 @@ async def get_items_by_imdb_ids(request: Request, imdb_ids: str) -> list[dict]: return [item.to_extended_dict() for item in items] -class ResetResponse(BaseModel): - message: str - ids: list[str] - - @router.post( "/reset", summary="Reset Media Items", @@ -284,11 +305,6 @@ async def reset_items(request: Request, ids: str) -> ResetResponse: return {"message": f"Reset items with id {ids}", "ids": ids} -class RetryResponse(BaseModel): - message: str - ids: list[str] - - @router.post( "/retry", summary="Retry Media Items", @@ -314,11 +330,6 @@ async def retry_items(request: Request, ids: str) -> RetryResponse: return {"message": f"Retried items with ids {ids}", "ids": ids} -class RemoveResponse(BaseModel): - message: str - ids: list[str] - - @router.delete( "/remove", summary="Remove Media Items", @@ -432,4 +443,154 @@ async def unblacklist_stream(_: Request, item_id: str, stream_id: int, db: Sessi return { "message": f"Unblacklisted stream {stream_id} for item {item_id}", - } \ No newline at end of file + } + +@router.post("/{id}/pause", response_model=PauseResponse, operation_id="pause_item") +@router.post("/pause", response_model=PauseResponse, operation_id="pause_items") +async def pause_items(request: Request, ids: str = None, db: Session = Depends(get_db)): + """Pause media items from being processed""" + item_ids = handle_ids(ids) + + items = db.execute( + select(MediaItem).where(MediaItem.id.in_(item_ids)) + ).unique().scalars().all() + + if not items: + raise HTTPException(status_code=404, detail="No items found") + + try: + for item in items: + item.pause() # or item.unpause() + db.commit() + except Exception as e: + db.rollback() + logger.error(f"Failed to update items: {str(e)}") + raise HTTPException(status_code=500, detail="Failed to update items.") + + return { + "message": f"Successfully paused {len(items)} items", + "ids": item_ids + } + +@router.post("/{id}/unpause", response_model=PauseResponse, operation_id="unpause_item") +@router.post("/unpause", response_model=PauseResponse, operation_id="unpause_items") +async def unpause_items(request: Request, ids: str = None, db: Session = Depends(get_db)): + """Unpause media items to resume processing""" + item_ids = handle_ids(ids) + + items = db.execute( + select(MediaItem).where(MediaItem.id.in_(item_ids)) + ).unique().scalars().all() + + if not items: + raise HTTPException(status_code=404, detail="No items found") + try: + for item in items: + item.unpause() + db.commit() + except SQLAlchemyError as e: + db.rollback() + logger.error(f"Failed to unpause items: {str(e)}") + raise HTTPException(status_code=500, detail="Failed to unpause items") + + return { + "message": f"Successfully unpaused {len(items)} items", + "ids": item_ids + } + +@router.get("/{id}/pause", response_model=PauseStateResponse, operation_id="get_pause_state") +async def get_pause_state(request: Request, id: str, db: Session = Depends(get_db)): + """Check if a media item is paused""" + item = db.execute( + select(MediaItem).where(MediaItem.id == id) + ).unique().scalar_one_or_none() + + if not item: + raise HTTPException(status_code=404, detail="Item not found") + + return { + "is_paused": item.is_paused, + "paused_at": str(item.paused_at) if item.paused_at else None, + "item_id": item.id, + "title": item.title + } + +@router.get("/paused", response_model=AllPausedResponse, operation_id="get_all_paused", responses={200: {"description": "Success, even if no items found"}}, response_model_exclude_none=True) +async def get_all_paused( + request: Request, + type: Optional[str] = None, + db: Session = Depends(get_db) +): + """Get all paused media items, optionally filtered by type""" + # Use raw SQL to verify data + query = select( + MediaItem.id, + MediaItem.title, + MediaItem.type, + MediaItem.is_paused, + MediaItem.paused_at + ).where(MediaItem.is_paused.is_(True)) + + if type: + valid_types = [t.value for t in MediaType] + if type not in valid_types: + raise HTTPException( + status_code=400, + detail=f"Invalid type. Must be one of: {', '.join(valid_types)}" + ) + query = query.where(MediaItem.type == type) + + logger.debug(f"Executing query: {str(query)}") + result = db.execute(query) + rows = result.fetchall() + logger.debug(f"Raw SQL found {len(rows)} rows") + + # Map results to response model + items = [ + { + "id": row.id, + "title": row.title, + "type": row.type, + "paused_at": str(row.paused_at) if row.paused_at else None + } + for row in rows + ] + + return AllPausedResponse(count=len(items), items=items) + +@router.get("/paused/count", response_model=dict, operation_id="get_paused_count") +async def get_paused_count( + request: Request, + type: Optional[str] = None, + db: Session = Depends(get_db) +): + """Get total count of paused items, optionally filtered by type""" + try: + # Build query using SQLAlchemy + query = select(func.count(MediaItem.id)).where(MediaItem.is_paused.is_(True)) + + if type: + valid_types = [t.value for t in MediaType] + if type not in valid_types: + raise HTTPException( + status_code=400, + detail=f"Invalid type. Must be one of: {', '.join(valid_types)}" + ) + query = query.where(MediaItem.type == type) + + logger.debug(f"Executing count query: {str(query)}") + count = db.scalar(query) or 0 # Default to 0 if None + logger.debug(f"Found {count} paused items") + + return { + "total": count, + "type": type if type else "all" + } + except SQLAlchemyError as e: + logger.error(f"Error in get_paused_count: {str(e)}") + # Return 0 count on error + return { + "total": 0, + "type": type if type else "all", + "error": "Failed to get count" + } From f395b7fd3a1e552185209c58eba4e8159916cc9b Mon Sep 17 00:00:00 2001 From: Arrrrr Date: Wed, 27 Nov 2024 17:14:46 -0800 Subject: [PATCH 2/4] feat: add pause function --- ...20241114_1400_c99239e3445f_add_pause_functionality.py | 3 +++ src/program/media/item.py | 9 +++++++-- src/routers/secure/items.py | 8 ++++---- 3 files changed, 14 insertions(+), 6 deletions(-) diff --git a/src/alembic/versions/20241114_1400_c99239e3445f_add_pause_functionality.py b/src/alembic/versions/20241114_1400_c99239e3445f_add_pause_functionality.py index 40c67b46..cbfe8cd7 100644 --- a/src/alembic/versions/20241114_1400_c99239e3445f_add_pause_functionality.py +++ b/src/alembic/versions/20241114_1400_c99239e3445f_add_pause_functionality.py @@ -25,6 +25,8 @@ def upgrade() -> None: sa.Column('is_paused', sa.Boolean(), nullable=True, default=False)) op.add_column('MediaItem', sa.Column('paused_at', sa.DateTime(), nullable=True)) + op.add_column('MediaItem', + sa.Column('unpaused_at', sa.DateTime(), nullable=True)) op.add_column('MediaItem', sa.Column('paused_by', sa.String(), nullable=True)) @@ -36,5 +38,6 @@ def downgrade() -> None: # Remove pause-related columns from MediaItem table op.drop_index(op.f('ix_mediaitem_is_paused'), table_name='MediaItem') op.drop_column('MediaItem', 'paused_by') + op.drop_column('MediaItem', 'unpaused_at') op.drop_column('MediaItem', 'paused_at') op.drop_column('MediaItem', 'is_paused') diff --git a/src/program/media/item.py b/src/program/media/item.py index c51336e0..45a67cf7 100644 --- a/src/program/media/item.py +++ b/src/program/media/item.py @@ -64,6 +64,8 @@ class MediaItem(db.Model): # Pause related fields is_paused: Mapped[Optional[bool]] = mapped_column(sqlalchemy.Boolean, default=False) paused_at: Mapped[Optional[datetime]] = mapped_column(sqlalchemy.DateTime, nullable=True) + unpaused_at: Mapped[Optional[datetime]] = mapped_column(sqlalchemy.DateTime, nullable=True) + paused_by: Mapped[Optional[str]] = mapped_column(sqlalchemy.String, nullable=True) __mapper_args__ = { "polymorphic_identity": "mediaitem", @@ -248,6 +250,7 @@ def to_dict(self): "scraped_times": self.scraped_times, "is_paused": self.is_paused, "paused_at": str(self.paused_at) if self.paused_at else None, + "unpaused_at": str(self.unpaused_at) if self.unpaused_at else None, } def to_extended_dict(self, abbreviated_children=False, with_streams=True): @@ -408,6 +411,7 @@ def pause(self) -> None: try: self.is_paused = True self.paused_at = datetime.now() + self.unpaused_at = None session = object_session(self) if session: @@ -428,13 +432,14 @@ def unpause(self) -> None: logger.debug(f"Unpausing {self.id}") try: self.is_paused = False - self.paused_at = None + self.unpaused_at = datetime.now() + # Keep paused_at for history session = object_session(self) if session: session.flush() - logger.info(f"{self.log_string} unpaused, is_paused={self.is_paused}, paused_at={self.paused_at}") + logger.info(f"{self.log_string} unpaused, is_paused={self.is_paused}, unpaused_at={self.unpaused_at}") except Exception as e: logger.error(f"Failed to unpause {self.log_string}: {str(e)}") raise diff --git a/src/routers/secure/items.py b/src/routers/secure/items.py index 9774e001..93a5eb4c 100644 --- a/src/routers/secure/items.py +++ b/src/routers/secure/items.py @@ -460,12 +460,12 @@ async def pause_items(request: Request, ids: str = None, db: Session = Depends(g try: for item in items: - item.pause() # or item.unpause() + item.pause() db.commit() - except Exception as e: + except SQLAlchemyError as e: db.rollback() - logger.error(f"Failed to update items: {str(e)}") - raise HTTPException(status_code=500, detail="Failed to update items.") + logger.error(f"Failed to pause items: {str(e)}") + raise HTTPException(status_code=500, detail="Failed to pause items") return { "message": f"Successfully paused {len(items)} items", From 692a4e8ab8773c562f80a58fe35e88367f73ee79 Mon Sep 17 00:00:00 2001 From: Arrrrr Date: Wed, 27 Nov 2024 20:18:49 -0800 Subject: [PATCH 3/4] feat : ad d pause --- ...00_c99239e3445f_add_pause_functionality.py | 9 +- src/program/media/item.py | 8 +- src/routers/secure/items.py | 447 +++++++++++------- 3 files changed, 283 insertions(+), 181 deletions(-) diff --git a/src/alembic/versions/20241114_1400_c99239e3445f_add_pause_functionality.py b/src/alembic/versions/20241114_1400_c99239e3445f_add_pause_functionality.py index cbfe8cd7..1aa2979f 100644 --- a/src/alembic/versions/20241114_1400_c99239e3445f_add_pause_functionality.py +++ b/src/alembic/versions/20241114_1400_c99239e3445f_add_pause_functionality.py @@ -1,7 +1,6 @@ """add_pause_functionality Revision ID: c99239e3445f -revision: str = 'c99239e3445f' Revises: c99709e3648f Create Date: 2024-11-14 16:00:00.000000 @@ -13,16 +12,16 @@ from alembic import op # revision identifiers, used by Alembic. -revision: str = '[generate a new revision ID]' +revision: str = 'c99239e3445f' down_revision: Union[str, None] = 'c99709e3648f' branch_labels: Union[str, Sequence[str], None] = None depends_on: Union[str, Sequence[str], None] = None def upgrade() -> None: - # Add pause-related columns to MediaItem table + # ### commands auto generated by Alembic - please adjust! ### op.add_column('MediaItem', - sa.Column('is_paused', sa.Boolean(), nullable=True, default=False)) + sa.Column('is_paused', sa.Boolean(), nullable=True, server_default='false')) op.add_column('MediaItem', sa.Column('paused_at', sa.DateTime(), nullable=True)) op.add_column('MediaItem', @@ -35,7 +34,7 @@ def upgrade() -> None: def downgrade() -> None: - # Remove pause-related columns from MediaItem table + # ### commands auto generated by Alembic - please adjust! ### op.drop_index(op.f('ix_mediaitem_is_paused'), table_name='MediaItem') op.drop_column('MediaItem', 'paused_by') op.drop_column('MediaItem', 'unpaused_at') diff --git a/src/program/media/item.py b/src/program/media/item.py index 45a67cf7..6bd67b2b 100644 --- a/src/program/media/item.py +++ b/src/program/media/item.py @@ -18,6 +18,12 @@ from ..db.db_functions import blacklist_stream, reset_streams from .stream import Stream +class MediaType(str, Enum): + """Media type enum""" + MOVIE = "movie" + SHOW = "show" + SEASON = "season" + EPISODE = "episode" class MediaItem(db.Model): """MediaItem class""" @@ -62,7 +68,7 @@ class MediaItem(db.Model): subtitles: Mapped[list[Subtitle]] = relationship(Subtitle, back_populates="parent", lazy="selectin", cascade="all, delete-orphan") # Pause related fields - is_paused: Mapped[Optional[bool]] = mapped_column(sqlalchemy.Boolean, default=False) + is_paused: Mapped[Optional[bool]] = mapped_column(sqlalchemy.Boolean, default=False, nullable=True, index=True) paused_at: Mapped[Optional[datetime]] = mapped_column(sqlalchemy.DateTime, nullable=True) unpaused_at: Mapped[Optional[datetime]] = mapped_column(sqlalchemy.DateTime, nullable=True) paused_by: Mapped[Optional[str]] = mapped_column(sqlalchemy.String, nullable=True) diff --git a/src/routers/secure/items.py b/src/routers/secure/items.py index 93a5eb4c..dafc3eec 100644 --- a/src/routers/secure/items.py +++ b/src/routers/secure/items.py @@ -1,3 +1,4 @@ +"""Items router""" import asyncio from datetime import datetime from typing import Literal, Optional @@ -6,19 +7,20 @@ from fastapi import APIRouter, Depends, HTTPException, Request, status from loguru import logger from pydantic import BaseModel -from sqlalchemy import and_, func, or_, select -from sqlalchemy.exc import NoResultFound -from sqlalchemy.orm import Session +from sqlalchemy import func, select +from sqlalchemy.exc import NoResultFound, SQLAlchemyError +from sqlalchemy.orm import Session, with_polymorphic -from program.db import db_functions -from program.db.db import db, get_db -from program.media.item import MediaItem, MediaType # Import MediaType from item module +from program.db.db import get_db +from program.media.item import MediaItem, MediaType, Movie, Show, Season, Episode from program.media.state import States from program.services.content import Overseerr from program.symlink import Symlinker from program.types import Event -from ..models.shared import MessageResponse +# Response Models +class MessageResponse(BaseModel): + message: str class StateResponse(BaseModel): success: bool @@ -67,11 +69,11 @@ class AllPausedResponse(BaseModel): ) -def handle_ids(ids: str) -> list[str]: - ids = [str(id) for id in ids.split(",")] if "," in ids else [str(ids)] +def handle_ids(ids: Optional[str]) -> list[str]: + """Handle comma-separated IDs or single ID""" if not ids: - raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="No item ID provided") - return ids + raise HTTPException(status_code=400, detail="No item ID provided") + return [str(id) for id in ids.split(",")] if "," in ids else [str(ids)] @router.get("/states", operation_id="get_states") @@ -213,6 +215,7 @@ async def get_items( summary="Add Media Items", description="Add media items with bases on imdb IDs", operation_id="add_items", + response_model=MessageResponse ) async def add_items(request: Request, imdb_ids: str = None) -> MessageResponse: if not imdb_ids: @@ -237,26 +240,7 @@ async def add_items(request: Request, imdb_ids: str = None) -> MessageResponse: ) request.app.program.em.add_item(item) - return {"message": f"Added {len(valid_ids)} item(s) to the queue"} - -@router.get( - "/{id}", - summary="Retrieve Media Item", - description="Fetch a single media item by ID", - operation_id="get_item", -) -async def get_item(_: Request, id: str, use_tmdb_id: Optional[bool] = False) -> dict: - with db.Session() as session: - try: - query = select(MediaItem) - if use_tmdb_id: - query = query.where(MediaItem.tmdb_id == id) - else: - query = query.where(MediaItem.id == id) - item = session.execute(query).unique().scalar_one() - except NoResultFound: - raise HTTPException(status_code=404, detail="Item not found") - return item.to_extended_dict(with_streams=False) + return MessageResponse(message=f"Added {len(valid_ids)} item(s) to the queue") @router.get( @@ -265,19 +249,26 @@ async def get_item(_: Request, id: str, use_tmdb_id: Optional[bool] = False) -> description="Fetch media items by IMDb IDs", operation_id="get_items_by_imdb_ids", ) -async def get_items_by_imdb_ids(request: Request, imdb_ids: str) -> list[dict]: - ids = imdb_ids.split(",") - with db.Session() as session: +async def get_items_by_imdb_ids( + imdb_ids: str, + session: Session = Depends(get_db) +) -> list[dict]: + """Get media items by IMDb IDs""" + try: items = [] - for id in ids: - item = ( - session.execute(select(MediaItem).where(MediaItem.imdb_id == id)) - .unique() - .scalar_one() - ) + for imdb_id in imdb_ids.split(","): + item = session.execute( + select(MediaItem) + .where(MediaItem.imdb_id == imdb_id) + ).scalar_one_or_none() + if item: items.append(item) - return [item.to_extended_dict() for item in items] + + return [item.to_dict() for item in items] + except SQLAlchemyError as e: + logger.error(f"Database error: {e}") + raise HTTPException(status_code=500, detail="Database error") @router.post( @@ -445,152 +436,258 @@ async def unblacklist_stream(_: Request, item_id: str, stream_id: int, db: Sessi "message": f"Unblacklisted stream {stream_id} for item {item_id}", } -@router.post("/{id}/pause", response_model=PauseResponse, operation_id="pause_item") -@router.post("/pause", response_model=PauseResponse, operation_id="pause_items") -async def pause_items(request: Request, ids: str = None, db: Session = Depends(get_db)): - """Pause media items from being processed""" - item_ids = handle_ids(ids) - - items = db.execute( - select(MediaItem).where(MediaItem.id.in_(item_ids)) - ).unique().scalars().all() - - if not items: - raise HTTPException(status_code=404, detail="No items found") - +# Pause-related endpoints (must come before generic /{id} routes) +@router.get("/paused", response_model=AllPausedResponse) +async def get_all_paused( + type: Optional[str] = None, + session: Session = Depends(get_db) +) -> AllPausedResponse: + """Get all paused items""" try: - for item in items: - item.pause() - db.commit() + # Build base query with explicit joins + query = ( + select(MediaItem) + .outerjoin(Movie, Movie.id == MediaItem.id) + .outerjoin(Show, Show.id == MediaItem.id) + .outerjoin(Season, Season.id == MediaItem.id) + .outerjoin(Episode, Episode.id == MediaItem.id) + .distinct() + .where(MediaItem.is_paused == True) + ) + + if type: + type_lower = type.lower() + valid_types = [t.value.lower() for t in MediaType] + if type_lower not in valid_types: + raise HTTPException( + status_code=400, + detail=f"Invalid type. Must be one of: {', '.join(valid_types)}" + ) + query = query.where(func.lower(MediaItem.type) == type_lower) + + logger.debug(f"Executing query: {query}") + logger.debug(f"Query parameters: {query.compile().params}") + + result = session.execute(query).scalars().unique().all() + logger.debug(f"Found {len(result)} paused items") + + # Add debug logging for each item + for item in result: + logger.debug(f"Paused item: id={item.id}, type={item.type}, is_paused={item.is_paused}, class={item.__class__.__name__}") + logger.debug(f"Item dict before conversion: {vars(item)}") + + items = [item.to_dict() for item in result] + logger.debug(f"Converted items to dict: {items}") + + return AllPausedResponse( + count=len(result), + items=items + ) except SQLAlchemyError as e: - db.rollback() - logger.error(f"Failed to pause items: {str(e)}") - raise HTTPException(status_code=500, detail="Failed to pause items") - - return { - "message": f"Successfully paused {len(items)} items", - "ids": item_ids - } + logger.error(f"Database error: {e}") + raise HTTPException( + status_code=500, + detail=f"Database error occurred: {str(e)}" + ) -@router.post("/{id}/unpause", response_model=PauseResponse, operation_id="unpause_item") -@router.post("/unpause", response_model=PauseResponse, operation_id="unpause_items") -async def unpause_items(request: Request, ids: str = None, db: Session = Depends(get_db)): - """Unpause media items to resume processing""" - item_ids = handle_ids(ids) - - items = db.execute( - select(MediaItem).where(MediaItem.id.in_(item_ids)) - ).unique().scalars().all() - - if not items: - raise HTTPException(status_code=404, detail="No items found") +@router.get("/paused/count", response_model=dict) +async def get_paused_count( + type: Optional[str] = None, + session: Session = Depends(get_db) +) -> dict: + """Get count of paused items""" try: - for item in items: - item.unpause() - db.commit() + # Use same explicit join structure + query = ( + select(func.count(func.distinct(MediaItem.id))) + .select_from(MediaItem) + .outerjoin(Movie, Movie.id == MediaItem.id) + .outerjoin(Show, Show.id == MediaItem.id) + .outerjoin(Season, Season.id == MediaItem.id) + .outerjoin(Episode, Episode.id == MediaItem.id) + .where(MediaItem.is_paused == True) + ) + + if type: + type_lower = type.lower() + query = query.where(func.lower(MediaItem.type) == type_lower) + + logger.debug(f"Executing count query: {query}") + count = session.execute(query).scalar() + logger.debug(f"Found {count} paused items") + + return {"count": count} except SQLAlchemyError as e: - db.rollback() - logger.error(f"Failed to unpause items: {str(e)}") - raise HTTPException(status_code=500, detail="Failed to unpause items") - - return { - "message": f"Successfully unpaused {len(items)} items", - "ids": item_ids - } + logger.error(f"Database error: {e}") + raise HTTPException( + status_code=500, + detail=f"Database error occurred: {str(e)}" + ) -@router.get("/{id}/pause", response_model=PauseStateResponse, operation_id="get_pause_state") -async def get_pause_state(request: Request, id: str, db: Session = Depends(get_db)): +@router.get("/pause/{id}", response_model=PauseStateResponse) +async def get_pause_state( + id: str, + session: Session = Depends(get_db) +) -> PauseStateResponse: """Check if a media item is paused""" - item = db.execute( - select(MediaItem).where(MediaItem.id == id) - ).unique().scalar_one_or_none() - - if not item: - raise HTTPException(status_code=404, detail="Item not found") - - return { - "is_paused": item.is_paused, - "paused_at": str(item.paused_at) if item.paused_at else None, - "item_id": item.id, - "title": item.title - } + try: + item = session.execute( + select(MediaItem).where(MediaItem.id == id) + ).scalar_one_or_none() + + if not item: + raise HTTPException(status_code=404, detail=f"Item {id} not found") + + return PauseStateResponse( + is_paused=item.is_paused, + paused_at=str(item.paused_at) if item.paused_at else None, + item_id=item.id, + title=item.title + ) + except SQLAlchemyError as e: + logger.error(f"Database error: {e}") + raise HTTPException( + status_code=500, + detail=f"Database error occurred: {str(e)}" + ) -@router.get("/paused", response_model=AllPausedResponse, operation_id="get_all_paused", responses={200: {"description": "Success, even if no items found"}}, response_model_exclude_none=True) -async def get_all_paused( - request: Request, - type: Optional[str] = None, - db: Session = Depends(get_db) -): - """Get all paused media items, optionally filtered by type""" - # Use raw SQL to verify data - query = select( - MediaItem.id, - MediaItem.title, - MediaItem.type, - MediaItem.is_paused, - MediaItem.paused_at - ).where(MediaItem.is_paused.is_(True)) - - if type: - valid_types = [t.value for t in MediaType] - if type not in valid_types: - raise HTTPException( - status_code=400, - detail=f"Invalid type. Must be one of: {', '.join(valid_types)}" - ) - query = query.where(MediaItem.type == type) - - logger.debug(f"Executing query: {str(query)}") - result = db.execute(query) - rows = result.fetchall() - logger.debug(f"Raw SQL found {len(rows)} rows") - - # Map results to response model - items = [ - { - "id": row.id, - "title": row.title, - "type": row.type, - "paused_at": str(row.paused_at) if row.paused_at else None - } - for row in rows - ] - - return AllPausedResponse(count=len(items), items=items) +@router.post("/{id}/pause", response_model=PauseResponse) +@router.post("/pause", response_model=PauseResponse) +async def pause_items( + request: Request, + id: Optional[str] = None, # Path parameter + ids: Optional[str] = None, # Query parameter + session: Session = Depends(get_db) +) -> PauseResponse: + """Pause one or more media items""" + try: + item_ids = [] + if id: + item_ids = [id] + elif ids: + item_ids = [i.strip() for i in ids.split(",") if i.strip()] + + if not item_ids: + raise HTTPException(status_code=400, detail="No item IDs provided") + + query = select(MediaItem).where(MediaItem.id.in_(item_ids)) + items = session.execute(query).scalars().all() + + found_ids = {item.id for item in items} + missing_ids = set(item_ids) - found_ids + + if not items: + raise HTTPException(status_code=404, detail="No items found") + + for item in items: + if not item.is_paused: # Only pause if not already paused + item.is_paused = True + item.paused_at = datetime.utcnow() + item.paused_by = request.state.user.username if hasattr(request.state, 'user') else None + + session.commit() + + message = f"Successfully paused {len(items)} items" + if missing_ids: + message += f". {len(missing_ids)} items not found: {', '.join(missing_ids)}" + + return PauseResponse( + message=message, + ids=list(found_ids) + ) + except SQLAlchemyError as e: + session.rollback() + logger.error(f"Database error: {e}") + raise HTTPException( + status_code=500, + detail=f"Database error occurred: {str(e)}" + ) -@router.get("/paused/count", response_model=dict, operation_id="get_paused_count") -async def get_paused_count( +@router.post("/{id}/unpause", response_model=PauseResponse) +@router.post("/unpause", response_model=PauseResponse) +async def unpause_items( request: Request, - type: Optional[str] = None, - db: Session = Depends(get_db) -): - """Get total count of paused items, optionally filtered by type""" + id: Optional[str] = None, # Path parameter + ids: Optional[str] = None, # Query parameter + session: Session = Depends(get_db) +) -> PauseResponse: + """Unpause one or more media items""" try: - # Build query using SQLAlchemy - query = select(func.count(MediaItem.id)).where(MediaItem.is_paused.is_(True)) + item_ids = [] + if id: + item_ids = [id] + elif ids: + item_ids = [i.strip() for i in ids.split(",") if i.strip()] - if type: - valid_types = [t.value for t in MediaType] - if type not in valid_types: - raise HTTPException( - status_code=400, - detail=f"Invalid type. Must be one of: {', '.join(valid_types)}" - ) - query = query.where(MediaItem.type == type) + if not item_ids: + raise HTTPException(status_code=400, detail="No item IDs provided") + + query = select(MediaItem).where(MediaItem.id.in_(item_ids)) + items = session.execute(query).scalars().all() - logger.debug(f"Executing count query: {str(query)}") - count = db.scalar(query) or 0 # Default to 0 if None - logger.debug(f"Found {count} paused items") + found_ids = {item.id for item in items} + missing_ids = set(item_ids) - found_ids - return { - "total": count, - "type": type if type else "all" - } + if not items: + raise HTTPException(status_code=404, detail="No items found") + + for item in items: + item.is_paused = False + item.unpaused_at = datetime.utcnow() + + session.commit() + + message = f"Successfully unpaused {len(items)} items" + if missing_ids: + message += f". {len(missing_ids)} items not found: {', '.join(missing_ids)}" + + return PauseResponse( + message=message, + ids=list(found_ids) + ) except SQLAlchemyError as e: - logger.error(f"Error in get_paused_count: {str(e)}") - # Return 0 count on error - return { - "total": 0, - "type": type if type else "all", - "error": "Failed to get count" - } + session.rollback() + logger.error(f"Database error: {e}") + raise HTTPException( + status_code=500, + detail=f"Database error occurred: {str(e)}" + ) + +# Generic item routes (must come after specific routes) +@router.get( + "/{id}", + summary="Retrieve Media Item", + description="Fetch a single media item by ID", + operation_id="get_item", + response_model=ItemsResponse +) +async def get_item( + id: str, + session: Session = Depends(get_db) +) -> ItemsResponse: + """Get a specific media item""" + try: + query = ( + select(MediaItem) + .where(MediaItem.id == id) + ) + + item = session.execute(query).scalar_one_or_none() + + if not item: + raise HTTPException(status_code=404, detail=f"Item {id} not found") + + return ItemsResponse( + success=True, + items=[item.to_dict()], + page=1, + limit=1, + total_items=1, + total_pages=1 + ) + except SQLAlchemyError as e: + logger.error(f"Database error: {e}") + raise HTTPException( + status_code=500, + detail=f"Database error occurred: {str(e)}" + ) From 6e1aed6e60377f4c2687b9f5901fb9f6dbf73df5 Mon Sep 17 00:00:00 2001 From: Arrrrr Date: Sat, 30 Nov 2024 19:07:46 -0800 Subject: [PATCH 4/4] Update item.py --- src/program/media/item.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/program/media/item.py b/src/program/media/item.py index 6bd67b2b..65f5ee60 100644 --- a/src/program/media/item.py +++ b/src/program/media/item.py @@ -413,7 +413,7 @@ def pause(self) -> None: logger.debug(f"{self.log_string} is already paused") return - logger.debug(f"Pausing {self.id}") + logger.debug(f"Pausing {self.log_string} (ID: {self.id}, State: {self.state.name})") try: self.is_paused = True self.paused_at = datetime.now()