Skip to content

Commit

Permalink
refactor: simplify downloaders
Browse files Browse the repository at this point in the history
chore: option to manually trigger dev build
refactor: rework /scrape to enable manual scraping via ui
feat: stream management endpoints
chore: refactore alldebrid, untested
  • Loading branch information
Gaisberg authored and Gaisberg committed Nov 1, 2024
1 parent 18cfa3b commit d75149e
Show file tree
Hide file tree
Showing 13 changed files with 1,020 additions and 1,112 deletions.
1 change: 1 addition & 0 deletions .github/workflows/docker-build-dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ on:
push:
branches:
- main
workflow_dispatch:

jobs:
build-and-push-dev:
Expand Down
7 changes: 7 additions & 0 deletions src/program/db/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,13 @@
db_host = settings_manager.settings.database.host
db = SQLAlchemy(db_host, engine_options=engine_options)

def get_db():
_db = db.Session()
try:
yield _db
finally:
_db.close()

script_location = data_dir_path / "alembic/"

if not os.path.exists(script_location):
Expand Down
42 changes: 41 additions & 1 deletion src/program/db/db_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,40 @@ def blacklist_stream(item: "MediaItem", stream: Stream, session: Session = None)
if close_session:
session.close()

def unblacklist_stream(item: "MediaItem", stream: Stream, session: Session = None) -> bool:
close_session = False
if session is None:
session = db.Session()
item = session.execute(select(type(item)).where(type(item)._id == item._id)).unique().scalar_one()
close_session = True

try:
item = session.merge(item)
association_exists = session.query(
session.query(StreamBlacklistRelation)
.filter(StreamBlacklistRelation.media_item_id == item._id)
.filter(StreamBlacklistRelation.stream_id == stream._id)
.exists()
).scalar()

if association_exists:
session.execute(
delete(StreamBlacklistRelation)
.where(StreamBlacklistRelation.media_item_id == item._id)
.where(StreamBlacklistRelation.stream_id == stream._id)
)
session.execute(
insert(StreamRelation)
.values(parent_id=item._id, child_id=stream._id)
)
item.store_state()
session.commit()
return True
return False
finally:
if close_session:
session.close()

def get_stream_count(media_item_id: int) -> int:
from program.media.item import MediaItem
"""Get the count of streams for a given MediaItem."""
Expand Down Expand Up @@ -365,7 +399,13 @@ def run_thread_with_db_item(fn, service, program, input_id, cancellation_event:
program.em.remove_id_from_queues(input_item._id)

if not cancellation_event.is_set():
input_item.store_state()
# Update parent item
if input_item.type == "episode":
input_item.parent.parent.store_state()
elif input_item.type == "season":
input_item.parent.store_state()
else:
input_item.store_state()
session.commit()

session.expunge_all()
Expand Down
210 changes: 70 additions & 140 deletions src/program/downloaders/__init__.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
from concurrent.futures import CancelledError, ThreadPoolExecutor, as_completed

from program.media.item import MediaItem
from program.media.state import States
from program.media.stream import Stream
from program.settings.manager import settings_manager
from loguru import logger

from .alldebrid import AllDebridDownloader
from .realdebrid import RealDebridDownloader
from .shared import get_needed_media
from .torbox import TorBoxDownloader


Expand All @@ -19,7 +20,7 @@ def __init__(self):
)
self.services = {
RealDebridDownloader: RealDebridDownloader(),
AllDebridDownloader: AllDebridDownloader(),
# AllDebridDownloader: AllDebridDownloader(),
# TorBoxDownloader: TorBoxDownloader()
}
self.service = next(
Expand All @@ -38,150 +39,79 @@ def validate(self):

def run(self, item: MediaItem):
logger.debug(f"Running downloader for {item.log_string}")
needed_media = get_needed_media(item)
hashes = [
stream.infohash
for stream in item.streams
if stream.infohash not in self.service.existing_hashes
]
cached_streams = self.get_cached_streams(hashes, needed_media)
if len(cached_streams) > 0:
item.active_stream = cached_streams[0]

for stream in item.streams:
torrent_id = None
try:
self.download(item, item.active_stream)
torrent_id = self.download_cached_stream(item, stream)
if torrent_id:
break
except Exception as e:
logger.error(f"Failed to download {item.log_string}: {e}")
if item.active_stream.get("infohash", None):
self._delete_and_reset_active_stream(item)
else:
for stream in item.streams:
if torrent_id:
self.service.delete_torrent(torrent_id)
logger.debug(f"Blacklisting {stream.raw_title} for {item.log_string}, reason: {e}")
item.blacklist_stream(stream)
logger.log("DEBRID", f"No cached torrents found for {item.log_string}")
yield item

def _delete_and_reset_active_stream(self, item: MediaItem):
try:
self.service.existing_hashes.remove(item.active_stream["infohash"])
self.service.delete_torrent_with_infohash(item.active_stream["infohash"])
stream = next(
(
stream
for stream in item.streams
if stream.infohash == item.active_stream["infohash"]
),
None,
)
if stream:
item.blacklist_stream(stream)
except Exception as e:
logger.debug(
f"Failed to delete and reset active stream for {item.log_string}: {e}"
)
item.active_stream = {}

def get_cached_streams(
self, hashes: list[str], needed_media, break_on_first=True
) -> dict:
chunks = [hashes[i : i + 5] for i in range(0, len(hashes), 5)]
# Using a list to share the state, booleans are immutable
break_pointer = [False, break_on_first]
results = []
priority_index = 0

with ThreadPoolExecutor(
thread_name_prefix="Downloader", max_workers=4
) as executor:
futures = []

def cancel_all():
for f in futures:
f.cancel()

for chunk in chunks:
future = executor.submit(
self.service.process_hashes, chunk, needed_media, break_pointer
)
futures.append(future)

for future in as_completed(futures):
try:
_result = future.result()
except CancelledError:
continue
for infohash, container in _result.items():
result = {"infohash": infohash, **container}
# Cached
if container.get("matched_files", False):
results.append(result)
if break_on_first and self.speed_mode:
cancel_all()
return results
elif infohash == hashes[priority_index] and break_on_first:
results = [result]
cancel_all()
return results
# Uncached
elif infohash == hashes[priority_index]:
priority_index += 1

results.sort(key=lambda x: hashes.index(x["infohash"]))
return results

def download(self, item, active_stream: dict) -> str:
torrent_id = self.service.download_cached(active_stream)
torrent_names = self.service.get_torrent_names(torrent_id)
update_item_attributes(item, torrent_names)
logger.log(
"DEBRID",
f"Downloaded {item.log_string} from '{item.active_stream['name']}' [{item.active_stream['infohash']}]",
)

def add_torrent(self, infohash: str):
def download_cached_stream(self, item: MediaItem, stream: Stream) -> bool:
torrent_id = None
cached_containers = self.get_instant_availability([stream.infohash]).get(stream.infohash, None)
if not cached_containers:
raise Exception("Not cached!")
the_container = cached_containers[0]
torrent_id = self.add_torrent(stream.infohash)
info = self.get_torrent_info(torrent_id)
self.select_files(torrent_id, the_container.keys())
if not self.update_item_attributes(item, info, the_container):
raise Exception("No matching files found!")
logger.info(f"Downloaded {item.log_string} from '{stream.raw_title}' [{stream.infohash}]")
return torrent_id

def get_instant_availability(self, infohashes: list[str]) -> dict[str, list[dict]]:
return self.service.get_instant_availability(infohashes)

def add_torrent(self, infohash: str) -> int:
return self.service.add_torrent(infohash)

def add_torrent_magnet(self, magnet_link: str):
return self.service.add_torrent_magnet(magnet_link)

def get_torrent_info(self, torrent_id: str):
def get_torrent_info(self, torrent_id: int):
return self.service.get_torrent_info(torrent_id)


def update_item_attributes(item: MediaItem, names: tuple[str, str]):
"""Update the item attributes with the downloaded files and active stream"""
matches_dict = item.active_stream.get("matched_files")
item.folder = names[0]
item.alternative_folder = names[1]
stream = next(
(
stream
for stream in item.streams
if stream.infohash == item.active_stream["infohash"]
),
None,
)
item.active_stream["name"] = stream.raw_title

if item.type in ["movie", "episode"]:
item.file = next(
file["filename"] for file in next(iter(matches_dict.values())).values()
)
elif item.type == "show":
for season in item.seasons:
for episode in season.episodes:
file = matches_dict.get(season.number, {}).get(episode.number, {})
if file:
episode.file = file["filename"]
episode.folder = item.folder
episode.alternative_folder = item.alternative_folder
episode.active_stream = {
**item.active_stream,
"files": [episode.file],
}
elif item.type == "season":
for episode in item.episodes:
file = matches_dict.get(item.number, {}).get(episode.number, {})
if file:
episode.file = file["filename"]
episode.folder = item.folder
episode.alternative_folder = item.alternative_folder
episode.active_stream = {**item.active_stream, "files": [episode.file]}
def select_files(self, torrent_id, container):
self.service.select_files(torrent_id, container)

def delete_torrent(self, torrent_id):
self.service.delete_torrent(torrent_id)

def update_item_attributes(self, item: MediaItem, info, container) -> bool:
"""Update the item attributes with the downloaded files and active stream"""
found = False
item = item
container = container
for file in container.values():
if item.type == "movie" and self.service.file_finder.container_file_matches_movie(file):
item.file = file[self.service.file_finder.filename_attr]
item.folder = info["filename"]
item.alternative_folder = info["original_filename"]
item.active_stream = {"infohash": info["hash"], "id": info["id"]}
found = True
break
if item.type in ["show", "season", "episode"]:
show = item
if item.type == "season":
show = item.parent
elif item.type == "episode":
show = item.parent.parent
file_season, file_episodes = self.service.file_finder.container_file_matches_episode(file)
if file_season and file_episodes:
season = next((season for season in show.seasons if season.number == file_season), None)
for file_episode in file_episodes:
episode = next((episode for episode in season.episodes if episode.number == file_episode), None)
if episode and episode.state not in [States.Completed, States.Symlinked, States.Downloaded]:
episode.file = file[self.service.file_finder.filename_attr]
episode.folder = info["filename"]
episode.alternative_folder = info["original_filename"]
episode.active_stream = {"infohash": info["hash"], "id": info["id"]}
# We have to make sure the episode is correct if item is an episode
if item.type != "episode" or (item.type == "episode" and episode.number == item.number):
found = True
return found
Loading

0 comments on commit d75149e

Please sign in to comment.