From fe4e3c567921a6a64363e1ca7ebc9414c32f4339 Mon Sep 17 00:00:00 2001 From: Svinokur Date: Mon, 26 Apr 2021 18:48:40 +0300 Subject: [PATCH] see version 1.8.0 changelog --- .gitignore | 1 + changelog.txt | 8 + selenium_driver_updater/chromeDriver.py | 95 +++----- selenium_driver_updater/edgeDriver.py | 94 +++----- selenium_driver_updater/geckoDriver.py | 219 +++++++----------- selenium_driver_updater/operaDriver.py | 96 ++++---- selenium_driver_updater/util/__init__.py | 0 selenium_driver_updater/util/extractor.py | 141 +++++++++++ selenium_driver_updater/util/github_viewer.py | 155 +++++++++++++ setup.py | 13 +- 10 files changed, 506 insertions(+), 316 deletions(-) create mode 100644 selenium_driver_updater/util/__init__.py create mode 100644 selenium_driver_updater/util/extractor.py create mode 100644 selenium_driver_updater/util/github_viewer.py diff --git a/.gitignore b/.gitignore index ca71ea0..422cef3 100644 --- a/.gitignore +++ b/.gitignore @@ -4,3 +4,4 @@ selenium_driver_updater/__pycache__/*.pyc selenium_driver_updater/test/__pycache__/*.pyc .DS_Store selenium_driver_updater/*.log +selenium_driver_updater/util/__pycache__/*.pyc diff --git a/changelog.txt b/changelog.txt index d80a8bd..9b4113b 100644 --- a/changelog.txt +++ b/changelog.txt @@ -1,3 +1,11 @@ +[1.8.0] 26/04/2021 +This version provides minor fixes. +This code was written and tested on Python 3.9.4 + +# Improvements + +- Improved functions decomposition. + [1.7.1] 26/04/2021 This version provides minor fixes. This code was written and tested on Python 3.9.4 diff --git a/selenium_driver_updater/chromeDriver.py b/selenium_driver_updater/chromeDriver.py index 98c6cce..dbb6342 100644 --- a/selenium_driver_updater/chromeDriver.py +++ b/selenium_driver_updater/chromeDriver.py @@ -3,7 +3,6 @@ import os import traceback import logging -import zipfile import time import stat import os @@ -23,9 +22,7 @@ from selenium.common.exceptions import SessionNotCreatedException from selenium.common.exceptions import WebDriverException -from shutil import copyfile - -import shutil +from util.extractor import Extractor class ChromeDriver(): @@ -73,6 +70,8 @@ def __init__(self, path : str, upgrade : bool, chmod : bool, check_driver_is_up_ self.version = version + self.extractor = Extractor + def __get_latest_version_chrome_driver(self) -> Tuple[bool, str, str]: """Gets latest chromedriver version @@ -182,14 +181,24 @@ def __get_latest_chromedriver_for_current_os(self, latest_version : str) -> Tupl time.sleep(2) if not self.filename: - - with zipfile.ZipFile(file_name, 'r') as zip_ref: - zip_ref.extractall(self.path) + + archive_path = file_name + out_path = self.path + result, message = self.extractor.extract_all_zip_archive(archive_path=archive_path, out_path=out_path) + if not result: + logging.error(message) + return result, message, file_name else: - result, message = self.__rename_driver(file_name=file_name) + archive_path = file_name + out_path = self.path + filename = setting['ChromeDriver']['LastReleasePlatform'] + filename_replace = self.filename + result, message = self.extractor.extract_all_zip_archive_with_specific_name(archive_path=archive_path, + out_path=out_path, filename=filename, filename_replace=filename_replace) if not result: + logging.error(message) return result, message, file_name time.sleep(3) @@ -205,7 +214,6 @@ def __get_latest_chromedriver_for_current_os(self, latest_version : str) -> Tupl result_run = True except: - message_run = f'Unexcepted error: {str(traceback.format_exc())}' logging.error(message_run) @@ -388,59 +396,6 @@ def __compare_current_version_and_latest_version(self) -> Tuple[bool, str, bool, return result_run, message_run, is_driver_up_to_date, current_version, latest_version - def __rename_driver(self, file_name : str) -> Tuple[bool, str]: - """Renames chromedriver if it was given - - Args: - file_name (str) : Path to the chromedriver - - Returns: - Tuple of bool, str and bool - - result_run (bool) : True if function passed correctly, False otherwise. - message_run (str) : Empty string if function passed correctly, non-empty string if error. - - Raises: - Except: If unexpected error raised. - - """ - result_run : bool = False - message_run : str = '' - renamed_driver_path : str = '' - - try: - - driver_folder_path = self.path + ChromeDriver._tmp_folder_path - logging.info(f'Created new directory for replacing name for chromedriver path: {driver_folder_path}') - - if os.path.exists(driver_folder_path): - shutil.rmtree(driver_folder_path) - - with zipfile.ZipFile(file_name, 'r') as zip_ref: - zip_ref.extractall(driver_folder_path) - - old_chromedriver_path = driver_folder_path + os.path.sep + setting['ChromeDriver']['LastReleasePlatform'] - new_chromedriver_path = driver_folder_path + os.path.sep + self.filename - - os.rename(old_chromedriver_path, new_chromedriver_path) - - renamed_driver_path = self.path + self.filename - if os.path.exists(renamed_driver_path): - os.remove(renamed_driver_path) - - copyfile(new_chromedriver_path, renamed_driver_path) - - if os.path.exists(driver_folder_path): - shutil.rmtree(driver_folder_path) - - result_run = True - - except: - message_run = f'Unexcepted error: {str(traceback.format_exc())}' - logging.error(message_run) - - return result_run, message_run - def __chmod_driver(self) -> Tuple[bool, str]: """Tries to give chromedriver needed permissions @@ -617,13 +572,23 @@ def __get_specific_version_chromedriver_for_current_os(self, version : str) -> T if not self.filename: - with zipfile.ZipFile(file_name, 'r') as zip_ref: - zip_ref.extractall(self.path) + archive_path = file_name + out_path = self.path + result, message = self.extractor.extract_all_zip_archive(archive_path=archive_path, out_path=out_path) + if not result: + logging.error(message) + return result, message, file_name else: - result, message = self.__rename_driver(file_name=file_name) + archive_path = file_name + out_path = self.path + filename = setting['ChromeDriver']['LastReleasePlatform'] + filename_replace = self.filename + result, message = self.extractor.extract_all_zip_archive_with_specific_name(archive_path=archive_path, + out_path=out_path, filename=filename, filename_replace=filename_replace) if not result: + logging.error(message) return result, message, file_name time.sleep(3) diff --git a/selenium_driver_updater/edgeDriver.py b/selenium_driver_updater/edgeDriver.py index 44a689c..77f7162 100644 --- a/selenium_driver_updater/edgeDriver.py +++ b/selenium_driver_updater/edgeDriver.py @@ -26,9 +26,7 @@ import stat -import zipfile - -from shutil import copyfile +from util.extractor import Extractor class EdgeDriver(): @@ -76,6 +74,8 @@ def __init__(self, path : str, upgrade : bool, chmod : bool, check_driver_is_up_ self.version = version + self.extractor = Extractor + def __get_current_version_edgedriver_selenium(self) -> Tuple[bool, str, str]: """Gets current edgedriver version @@ -258,13 +258,25 @@ def __get_latest_edgedriver_for_current_os(self, latest_version : str) -> Tuple[ if not self.filename: - with zipfile.ZipFile(file_name, 'r') as zip_ref: - zip_ref.extractall(self.path) + archive_path = file_name + out_path = self.path + + result, message = self.extractor.extract_all_zip_archive(archive_path=archive_path, out_path=out_path) + if not result: + logging.error(message) + return result, message, file_name else: - result, message = self.__rename_driver(file_name=file_name) + archive_path = file_name + out_path = self.path + filename = setting['EdgeDriver']['LastReleasePlatform'] + filename_replace = self.filename + + result, message = self.extractor.extract_all_zip_archive_with_specific_name(archive_path=archive_path, + out_path=out_path, filename=filename, filename_replace=filename_replace) if not result: + logging.error(message) return result, message, file_name time.sleep(3) @@ -405,59 +417,6 @@ def __compare_current_version_and_latest_version(self) -> Tuple[bool, str, bool, return result_run, message_run, is_driver_up_to_date, current_version, latest_version - def __rename_driver(self, file_name : str) -> Tuple[bool, str]: - """Renames edgedriver if it was given - - Args: - file_name (str) : Path to the edgedriver - - Returns: - Tuple of bool, str and bool - - result_run (bool) : True if function passed correctly, False otherwise. - message_run (str) : Empty string if function passed correctly, non-empty string if error. - - Raises: - Except: If unexpected error raised. - - """ - result_run : bool = False - message_run : str = '' - renamed_driver_path : str = '' - - try: - - driver_folder_path = self.path + EdgeDriver._tmp_folder_path - logging.info(f'Created new directory for replacing name for edgedriver path: {driver_folder_path}') - - if os.path.exists(driver_folder_path): - shutil.rmtree(driver_folder_path) - - with zipfile.ZipFile(file_name, 'r') as zip_ref: - zip_ref.extractall(driver_folder_path) - - old_edgedriver_path = driver_folder_path + os.path.sep + setting['EdgeDriver']['LastReleasePlatform'] - new_edgedriver_path = driver_folder_path + os.path.sep + self.filename - - os.rename(old_edgedriver_path, new_edgedriver_path) - - renamed_driver_path = self.path + self.filename - if os.path.exists(renamed_driver_path): - os.remove(renamed_driver_path) - - copyfile(new_edgedriver_path, renamed_driver_path) - - if os.path.exists(driver_folder_path): - shutil.rmtree(driver_folder_path) - - result_run = True - - except: - message_run = f'Unexcepted error: {str(traceback.format_exc())}' - logging.error(message_run) - - return result_run, message_run - def __chmod_driver(self) -> Tuple[bool, str]: """Tries to give edgedriver needed permissions @@ -629,13 +588,24 @@ def __get_specific_version_edgedriver_for_current_os(self, version : str) -> Tup if not self.filename: - with zipfile.ZipFile(file_name, 'r') as zip_ref: - zip_ref.extractall(self.path) + archive_path = file_name + out_path = self.path + result, message = self.extractor.extract_all_zip_archive(archive_path=archive_path, out_path=out_path) + if not result: + logging.error(message) + return result, message, file_name else: - result, message = self.__rename_driver(file_name=file_name) + archive_path = file_name + out_path = self.path + filename = setting['EdgeDriver']['LastReleasePlatform'] + filename_replace = self.filename + + result, message = self.extractor.extract_all_zip_archive_with_specific_name(archive_path=archive_path, + out_path=out_path, filename=filename, filename_replace=filename_replace) if not result: + logging.error(message) return result, message, file_name time.sleep(3) diff --git a/selenium_driver_updater/geckoDriver.py b/selenium_driver_updater/geckoDriver.py index 57850ed..d3abdbf 100644 --- a/selenium_driver_updater/geckoDriver.py +++ b/selenium_driver_updater/geckoDriver.py @@ -1,6 +1,5 @@ import json from selenium import webdriver -import requests import wget import os import traceback @@ -16,8 +15,6 @@ import requests -import tarfile - import sys import os.path sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__)))) @@ -26,16 +23,18 @@ import platform -import shutil -from shutil import copyfile - import stat -import zipfile +from util.extractor import Extractor + +try: + from util.github_viewer import GithubViewer +except: + from selenium_driver_updater.selenium_driver_updater.util.github_viewer import GithubViewer class GeckoDriver(): - _tmp_folder_path = 'tmp' + _repo_name = 'mozilla/geckodriver' def __init__(self, path : str, upgrade : bool, chmod : bool, check_driver_is_up_to_date : bool, info_messages : bool, filename : str, version : str): @@ -79,6 +78,10 @@ def __init__(self, path : str, upgrade : bool, chmod : bool, check_driver_is_up_ self.version = version + self.extractor = Extractor + + self.github_viewer = GithubViewer + def __get_current_version_geckodriver_selenium(self) -> Tuple[bool, str, str]: """Gets current geckodriver version @@ -157,10 +160,10 @@ def __get_latest_version_geckodriver(self) -> Tuple[bool, str, str]: try: - url = self.setting['GeckoDriver']['LinkLastRelease'] - request = requests.get(url=url, headers=self.headers) - request_text = request.text - json_data = json.loads(str(request_text)) + repo_name = GeckoDriver._repo_name + result, message, json_data = self.github_viewer.get_latest_release_data_by_repo_name(repo_name=repo_name) + if not result: + return result, message, latest_version latest_version = json_data.get('name') @@ -225,33 +228,24 @@ def __get_latest_geckodriver_for_current_os(self) -> Tuple[bool, str, str]: result_run : bool = False message_run : str = '' file_name : str = '' - filename_git : str = '' + asset_name : str = '' url : str = '' geckodriver_version : str = '' try: - url = self.setting['GeckoDriver']['LinkLastRelease'] - request = requests.get(url=url, headers=self.headers) - request_text = request.text - json_data = json.loads(str(request_text)) - - geckodriver_version = json_data.get('name') - - for asset in json_data.get('assets'): - if self.setting['GeckoDriver']['LinkLastReleasePlatform'] in asset.get('name'): - filename_git = asset.get('name') - url = asset.get('browser_download_url') - break - - if not filename_git: - message = (f"Geckodriver binary was not found, maybe unknown OS " - f"LinkLastReleasePlatform: {self.setting['GeckoDriver']['LinkLastReleasePlatform']}" ) - logging.error(message) - return False, message, file_name + repo_name = GeckoDriver._repo_name + asset_name = self.setting['GeckoDriver']['LinkLastReleasePlatform'] + result, message, data = self.github_viewer.get_specific_asset_by_repo_name(repo_name=repo_name, asset_name=asset_name) + if not result: + return result, message, file_name + + geckodriver_version = data[0].get('version') + url = data[0].get('asset').get('browser_download_url') + asset_name = data[0].get('asset').get('name') logging.info(f'Started download geckodriver geckodriver_version: {geckodriver_version}') - out_path = self.path + filename_git + out_path = self.path + asset_name if os.path.exists(out_path): os.remove(out_path) @@ -266,25 +260,39 @@ def __get_latest_geckodriver_for_current_os(self) -> Tuple[bool, str, str]: if not self.filename: - if filename_git.endswith('.tar.gz'): + if asset_name.endswith('.tar.gz'): - with tarfile.open(file_name, "r:gz") as tar: - tar.extractall(self.path) + archive_path = file_name + out_path = self.path + result, message = self.extractor.extract_all_tar_gz_archive(archive_path=archive_path, out_path=out_path) + if not result: + logging.error(message) + return result, message, file_name - elif filename_git.endswith('.zip'): + elif asset_name.endswith('.zip'): - with zipfile.ZipFile(file_name, 'r') as zip_ref: - zip_ref.extractall(self.path) + archive_path = file_name + out_path = self.path + result, message = self.extractor.extract_all_zip_archive(archive_path=archive_path, out_path=out_path) + if not result: + logging.error(message) + return result, message, file_name else: - message = f'Unknown archive format was specified filename: {filename_git}' + message = f'Unknown archive format was specified asset_name: {asset_name}' logging.error(message) return result_run, message, file_name else: - result, message = self.__rename_driver(file_name=file_name,filename_git=filename_git) + archive_path = file_name + out_path = self.path + filename = setting['GeckoDriver']['LastReleasePlatform'] + filename_replace = self.filename + result, message = self.extractor.extract_all_zip_archive_with_specific_name(archive_path=archive_path, + out_path=out_path, filename=filename, filename_replace=filename_replace) if not result: + logging.error(message) return result, message, file_name time.sleep(3) @@ -298,8 +306,6 @@ def __get_latest_geckodriver_for_current_os(self) -> Tuple[bool, str, str]: result_run = True - logging.info(f'Geckodriver was successfully updated to the last version: {geckodriver_version}') - except: message_run = f'Unexcepted error: {str(traceback.format_exc())}' logging.error(message_run) @@ -419,72 +425,6 @@ def __compare_current_version_and_latest_version(self) -> Tuple[bool, str, bool, return result_run, message_run, is_driver_up_to_date, current_version, latest_version - def __rename_driver(self, file_name : str, filename_git : str) -> Tuple[bool, str]: - """Renames geckodriver if it was given - - Args: - file_name (str) : Path to the geckodriver - filename_git (str) : Name of geckodriver archive - - Returns: - Tuple of bool, str and bool - - result_run (bool) : True if function passed correctly, False otherwise. - message_run (str) : Empty string if function passed correctly, non-empty string if error. - - Raises: - Except: If unexpected error raised. - - """ - result_run : bool = False - message_run : str = '' - renamed_driver_path : str = '' - - try: - - driver_folder_path = self.path + GeckoDriver._tmp_folder_path - logging.info(f'Created new directory for replacing name for geckodriver path: {driver_folder_path}') - - if os.path.exists(driver_folder_path): - shutil.rmtree(driver_folder_path) - - if filename_git.endswith('.tar.gz'): - - with tarfile.open(file_name, "r:gz") as tar: - tar.extractall(driver_folder_path) - - elif filename_git.endswith('.zip'): - - with zipfile.ZipFile(file_name, 'r') as zip_ref: - zip_ref.extractall(driver_folder_path) - - else: - message = f'Unknown archive format was specified filename: {filename_git}' - logging.error(message) - return result_run, message - - old_geckodriver_path = driver_folder_path + os.path.sep + setting['GeckoDriver']['LastReleasePlatform'] - new_geckodriver_path = driver_folder_path + os.path.sep + self.filename - - os.rename(old_geckodriver_path, new_geckodriver_path) - - renamed_driver_path = self.path + self.filename - if os.path.exists(renamed_driver_path): - os.remove(renamed_driver_path) - - copyfile(new_geckodriver_path, renamed_driver_path) - - if os.path.exists(driver_folder_path): - shutil.rmtree(driver_folder_path) - - result_run = True - - except: - message_run = f'Unexcepted error: {str(traceback.format_exc())}' - logging.error(message_run) - - return result_run, message_run - def __chmod_driver(self) -> Tuple[bool, str]: """Tries to give geckodriver needed permissions @@ -633,33 +573,24 @@ def __get_specific_version_geckodriver_for_current_os(self, version) -> Tuple[bo result_run : bool = False message_run : str = '' file_name : str = '' - filename_git : str = '' url : str = '' + asset_name : str = '' try: - url = self.setting["GeckoDriver"]["LinkAllReleases"] - request = requests.get(url=url, headers=self.headers) - request_text = request.text - json_data = json.loads(str(request_text)) - - for release in json_data: - if version == release.get('name') or version in release.get('tag_name'): - for asset in release.get('assets'): - if self.setting['GeckoDriver']['LinkLastReleasePlatform'] in asset.get('name'): - filename_git = asset.get('name') - url = asset.get('browser_download_url') - break - - if not filename_git: - message = (f"Geckodriver binary was not found, maybe unknown OS or incorrect version " - f"LinkLastReleasePlatform: {self.setting['GeckoDriver']['LinkLastReleasePlatform']} " - f"specific_version: {version}" ) - logging.error(message) - return False, message, file_name + repo_name = GeckoDriver._repo_name + asset_name = self.setting['GeckoDriver']['LinkLastReleasePlatform'] + version = version + result, message, data = self.github_viewer.get_specific_asset_by_specific_version_by_repo_name(repo_name=repo_name, + asset_name=asset_name, version=version) + if not result: + return result, message, file_name + + url = data[0].get('asset').get('browser_download_url') + asset_name = data[0].get('asset').get('name') logging.info(f'Started download geckodriver specific_version: {version}') - out_path = self.path + filename_git + out_path = self.path + asset_name if os.path.exists(out_path): os.remove(out_path) @@ -674,25 +605,39 @@ def __get_specific_version_geckodriver_for_current_os(self, version) -> Tuple[bo if not self.filename: - if filename_git.endswith('.tar.gz'): + if asset_name.endswith('.tar.gz'): - with tarfile.open(file_name, "r:gz") as tar: - tar.extractall(self.path) + archive_path = file_name + out_path = self.path + result, message = self.extractor.extract_all_tar_gz_archive(archive_path=archive_path, out_path=out_path) + if not result: + logging.error(message) + return result, message, file_name - elif filename_git.endswith('.zip'): + elif asset_name.endswith('.zip'): - with zipfile.ZipFile(file_name, 'r') as zip_ref: - zip_ref.extractall(self.path) + archive_path = file_name + out_path = self.path + result, message = self.extractor.extract_all_zip_archive(archive_path=archive_path, out_path=out_path) + if not result: + logging.error(message) + return result, message, file_name else: - message = f'Unknown archive format was specified filename: {filename_git}' + message = f'Unknown archive format was specified asset_name: {asset_name}' logging.error(message) return result_run, message, file_name else: - result, message = self.__rename_driver(file_name=file_name,filename_git=filename_git) + archive_path = file_name + out_path = self.path + filename = setting['GeckoDriver']['LastReleasePlatform'] + filename_replace = self.filename + result, message = self.extractor.extract_all_zip_archive_with_specific_name(archive_path=archive_path, + out_path=out_path, filename=filename, filename_replace=filename_replace) if not result: + logging.error(message) return result, message, file_name time.sleep(3) diff --git a/selenium_driver_updater/operaDriver.py b/selenium_driver_updater/operaDriver.py index 8712bd2..852555c 100644 --- a/selenium_driver_updater/operaDriver.py +++ b/selenium_driver_updater/operaDriver.py @@ -15,8 +15,6 @@ import requests -import zipfile - import sys import os.path sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__)))) @@ -29,7 +27,12 @@ from shutil import copyfile +from util.extractor import Extractor +from util.github_viewer import GithubViewer + class OperaDriver(): + + _repo_name = 'operasoftware/operachromiumdriver' def __init__(self, path : str, upgrade : bool, chmod : bool, check_driver_is_up_to_date : bool, info_messages : bool, filename : str, version : str): @@ -73,6 +76,10 @@ def __init__(self, path : str, upgrade : bool, chmod : bool, check_driver_is_up_ self.version = version + self.extractor = Extractor + + self.github_viewer = GithubViewer + def __get_current_version_operadriver_selenium(self) -> Tuple[bool, str, str]: """Gets current operadriver version @@ -149,11 +156,11 @@ def __get_latest_version_operadriver(self) -> Tuple[bool, str, str]: try: - request = requests.get(self.setting['OperaDriver']['LinkLastRelease'], headers=self.headers) - request_text = request.text - json_data = json.loads(str(request_text)) + repo_name = OperaDriver._repo_name + result, message, json_data = self.github_viewer.get_latest_release_data_by_repo_name(repo_name=repo_name) + if not result: + return result, message, latest_version - latest_version = json_data.get('name') logging.info(f'Latest version of operadriver: {latest_version}') @@ -222,26 +229,18 @@ def __get_latest_operadriver_for_current_os(self) -> Tuple[bool, str, str]: operadriver_version : str = '' try: - request = requests.get(self.setting['OperaDriver']['LinkLastRelease'], headers=self.headers) - request_text = request.text - json_data = json.loads(str(request_text)) - - operadriver_version = json_data.get('name') - for asset in json_data.get('assets'): - if self.setting['OperaDriver']['LinkLastReleasePlatform'] == asset.get('name'): - filename_git = asset.get('name') - url = asset.get('browser_download_url') - break + repo_name = OperaDriver._repo_name + asset_name = self.setting['OperaDriver']['LinkLastReleasePlatform'] + result, message, data = self.github_viewer.get_specific_asset_by_repo_name(repo_name=repo_name, asset_name=asset_name) + if not result: + return result, message, file_name - if not filename_git: - message = (f"Operadriver binary was not found, maybe unknown OS" - f"LinkLastReleasePlatform: {self.setting['OperaDriver']['LinkLastReleasePlatform']}") - logging.error(message) - return False, message, file_name + operadriver_version = data[0].get('version') + url = data[0].get('asset').get('browser_download_url') logging.info(f'Started download operadriver operadriver_version: {operadriver_version}') - out_path = self.path + filename_git + out_path = self.path + data[0].get('asset').get('name') if os.path.exists(out_path): os.remove(out_path) @@ -254,8 +253,12 @@ def __get_latest_operadriver_for_current_os(self) -> Tuple[bool, str, str]: time.sleep(2) - with zipfile.ZipFile(file_name, 'r') as zip_ref: - zip_ref.extractall(self.path) + archive_path = file_name + out_path = self.path + result, message = self.extractor.extract_all_zip_archive(archive_path=archive_path, out_path=out_path) + if not result: + logging.error(message) + return result, message, file_name time.sleep(3) @@ -281,9 +284,9 @@ def __get_latest_operadriver_for_current_os(self) -> Tuple[bool, str, str]: file_name = self.operadriver_path - result_run = True + logging.info(f'Operadriver was successfully unpacked by path: {file_name}') - logging.info(f'Operadriver was successfully updated to the last version: {operadriver_version}') + result_run = True except: message_run = f'Unexcepted error: {str(traceback.format_exc())}' @@ -438,7 +441,7 @@ def __rename_driver(self, archive_folder_path : str, archive_operadriver_path : if os.path.exists(renamed_driver_path): os.remove(renamed_driver_path) - copyfile(new_path, self.path + self.filename) + copyfile(new_path, renamed_driver_path) result_run = True @@ -593,33 +596,22 @@ def __get_specific_version_operadriver_for_current_os(self, version : str) -> Tu result_run : bool = False message_run : str = '' file_name : str = '' - filename_git : str = '' url : str = '' try: - url = self.setting["OperaDriver"]["LinkAllReleases"] - request = requests.get(url=url, headers=self.headers) - request_text = request.text - json_data = json.loads(str(request_text)) - - for release in json_data: - if version == release.get('name') or version in release.get('tag_name'): - for asset in release.get('assets'): - if self.setting['OperaDriver']['LinkLastReleasePlatform'] in asset.get('name'): - filename_git = asset.get('name') - url = asset.get('browser_download_url') - break - - if not filename_git: - message = (f"OperaDriver binary was not found, maybe unknown OS or incorrect version " - f"LinkLastReleasePlatform: {self.setting['OperaDriver']['LinkLastReleasePlatform']} " - f"specific_version: {version}" ) - logging.error(message) - return False, message, file_name + repo_name = OperaDriver._repo_name + asset_name = self.setting['OperaDriver']['LinkLastReleasePlatform'] + version = version + result, message, data = self.github_viewer.get_specific_asset_by_specific_version_by_repo_name(repo_name=repo_name, + asset_name=asset_name, version=version) + if not result: + return result, message, file_name + + url = data[0].get('asset').get('browser_download_url') logging.info(f'Started download operadriver specific_version: {version}') - out_path = self.path + filename_git + out_path = self.path + data[0].get('asset').get('name') if os.path.exists(out_path): os.remove(out_path) @@ -632,8 +624,12 @@ def __get_specific_version_operadriver_for_current_os(self, version : str) -> Tu time.sleep(2) - with zipfile.ZipFile(file_name, 'r') as zip_ref: - zip_ref.extractall(self.path) + archive_path = file_name + out_path = self.path + result, message = self.extractor.extract_all_zip_archive(archive_path=archive_path, out_path=out_path) + if not result: + logging.error(message) + return result, message, file_name time.sleep(3) diff --git a/selenium_driver_updater/util/__init__.py b/selenium_driver_updater/util/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/selenium_driver_updater/util/extractor.py b/selenium_driver_updater/util/extractor.py new file mode 100644 index 0000000..ad81f05 --- /dev/null +++ b/selenium_driver_updater/util/extractor.py @@ -0,0 +1,141 @@ +import traceback +import logging +from typing import Tuple +import zipfile +import os +import shutil +from shutil import copyfile +import tarfile + +class Extractor(): + + @staticmethod + def extract_all_zip_archive(archive_path : str, out_path : str) -> Tuple[bool, str]: + """Extract all members in specific zip archive + + Args: + archive_path (str) : Path to specific archive. + out_path (str) : Out path, where all members of archive will be gathered. + + + Returns: + Tuple of bool and str + + result_run (bool) : True if function passed correctly, False otherwise. + message_run (str) : Empty string if function passed correctly, non-empty string if error. + """ + result_run : bool = False + message_run : str = '' + try: + + with zipfile.ZipFile(archive_path, 'r') as zip_ref: + zip_ref.extractall(out_path) + + if os.path.exists(archive_path): + os.remove(archive_path) + + result_run = True + + except: + + message_run = f'Unexcepted error: {str(traceback.format_exc())}' + logging.error(message_run) + + return result_run, message_run + + @staticmethod + def extract_all_tar_gz_archive(archive_path : str, out_path : str) -> Tuple[bool, str]: + """Extract all members in specific tar.gz archive + + Args: + archive_path (str) : Path to specific archive. + out_path (str) : Out path, where all members of archive will be gathered. + + Returns: + Tuple of bool and str + + result_run (bool) : True if function passed correctly, False otherwise. + message_run (str) : Empty string if function passed correctly, non-empty string if error. + """ + result_run : bool = False + message_run : str = '' + try: + + with tarfile.open(archive_path, "r:gz") as tar_ref: + tar_ref.extractall(out_path) + + if os.path.exists(archive_path): + os.remove(archive_path) + + result_run = True + + except: + message_run = f'Unexcepted error: {str(traceback.format_exc())}' + logging.error(message_run) + + return result_run, message_run + + @staticmethod + def extract_all_zip_archive_with_specific_name(archive_path : str, out_path : str, filename : str, filename_replace : str) -> Tuple[bool, str]: + """Extract all zip archive and replaces name for one of member + + Args: + archive_path (str) : Path to specific archive. + out_path (str) : Out path, where all members of archive will be gathered. + filename (str) : Archive member whose name will be changed. + filename_replace (str) : Specific name for replacing. + + Returns: + Tuple of bool and str + + result_run (bool) : True if function passed correctly, False otherwise. + message_run (str) : Empty string if function passed correctly, non-empty string if error. + """ + result_run : bool = False + message_run : str = '' + try: + + driver_folder_path = out_path + 'tmp' + logging.info(f'Created new safety directory for replacing filename: {filename} filename_replace: {filename_replace}') + + if os.path.exists(driver_folder_path): + shutil.rmtree(driver_folder_path) + + if archive_path.endswith('.tar.gz'): + + result, message = Extractor.extract_all_tar_gz_archive(archive_path=archive_path, out_path=driver_folder_path) + if not result: + return result, message + + elif archive_path.endswith('.zip'): + + result, message = Extractor.extract_all_zip_archive(archive_path=archive_path, out_path=driver_folder_path) + if not result: + return result, message + + else: + message = f'Unknown archive format was specified archive_path: {archive_path}' + logging.error(message) + return result_run, message + + old_path = driver_folder_path + os.path.sep + filename + new_path = driver_folder_path + os.path.sep + filename_replace + + os.rename(old_path, new_path) + + renamed_driver_path = out_path + filename_replace + if os.path.exists(renamed_driver_path): + os.remove(renamed_driver_path) + + copyfile(new_path, renamed_driver_path) + + if os.path.exists(driver_folder_path): + shutil.rmtree(driver_folder_path) + + result_run = True + + except: + message_run = f'Unexcepted error: {str(traceback.format_exc())}' + logging.error(message_run) + + return result_run, message_run \ No newline at end of file diff --git a/selenium_driver_updater/util/github_viewer.py b/selenium_driver_updater/util/github_viewer.py new file mode 100644 index 0000000..d9b6e25 --- /dev/null +++ b/selenium_driver_updater/util/github_viewer.py @@ -0,0 +1,155 @@ +from typing import Any, Tuple +import requests +import traceback +import logging +import json + +class GithubViewer(): + + user_agent = 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_3) AppleWebKit/537.36 (KHTML, like Gecko) \ + Chrome/35.0.1916.47 Safari/537.36' + + _headers = {'User-Agent': user_agent} + + @staticmethod + def get_latest_release_data_by_repo_name(repo_name : str) -> Tuple[bool, str, Any]: + """Gets latest release asset by github repository name + + Args: + repo_name (str): Repository name on github. Something like operasoftware/operachromiumdriver + + Returns: + Tuple of bool, str and Any + + result_run (bool) : True if function passed correctly, False otherwise. + message_run (str) : Empty string if function passed correctly, non-empty string if error. + json_data : All latest release data. + """ + + result_run : bool = False + message_run : str = '' + url : str = f'https://api.github.com/repos/{repo_name}/releases/latest' + json_data : Any = '' + + try: + + request = requests.get(url, headers=GithubViewer._headers) + request_text = request.text + json_data = json.loads(str(request_text)) + + result_run = True + + except: + + message_run = f'Unexcepted error: {str(traceback.format_exc())}' + logging.error(message_run) + + return result_run, message_run, json_data + + @staticmethod + def get_specific_asset_by_repo_name(asset_name : str, repo_name : str): + """Gets specific asset by asset name and repository name + + Args: + asset_name (str) : Specific name of asset. + repo_name (str) : Repository name on github. Something like operasoftware/operachromiumdriver + + Returns: + Tuple of bool, str and Any + + result_run (bool) : True if function passed correctly, False otherwise. + message_run (str) : Empty string if function passed correctly, non-empty string if error. + json_data : All gathered data. + """ + + result_run : bool = False + message_run : str = '' + url : str = f'https://api.github.com/repos/{repo_name}/releases/latest' + data = [] + asset = '' + is_found : bool = False + specific_asset = [] + + try: + + request = requests.get(url, headers=GithubViewer._headers) + request_text = request.text + json_data = json.loads(str(request_text)) + + version = json_data.get('name') + + for asset in json_data.get('assets'): + if asset_name == asset.get('name') or asset_name in asset.get('name'): + is_found = True + specific_asset = asset + break + + if not is_found: + message = f"Specific binary was not found, maybe unknown OS. asset_name: {asset_name} repo_name: {repo_name}" + logging.error(message) + return result_run, message, data + + data.append(dict(version=version, asset=specific_asset)) + + result_run = True + + except: + message_run = f'Unexcepted error: {str(traceback.format_exc())}' + logging.error(message_run) + + return result_run, message_run, data + + @staticmethod + def get_specific_asset_by_specific_version_by_repo_name(version : str, asset_name : str, repo_name : str): + """Gets specific asset by specific release by repository name + + Args: + version (str) : Specific release version of asset. + asset_name (str) : Specific name of asset. + repo_name (str) : Repository name on github. Something like operasoftware/operachromiumdriver + + Returns: + Tuple of bool, str and Any + + result_run (bool) : True if function passed correctly, False otherwise. + message_run (str) : Empty string if function passed correctly, non-empty string if error. + json_data : All gathered data. + """ + + result_run : bool = False + message_run : str = '' + url : str = f'https://api.github.com/repos/{repo_name}/releases' + data = [] + asset = '' + is_found : bool = False + specific_asset = [] + + try: + + request = requests.get(url=url, headers=GithubViewer._headers) + request_text = request.text + json_data = json.loads(str(request_text)) + + for release in json_data: + if version == release.get('name') or version in release.get('tag_name'): + for asset in release.get('assets'): + if asset_name in asset.get('name'): + is_found = True + specific_asset = asset + break + + if not is_found: + message = (f"Specific binary by specific release was not found, maybe unknown OS." + f"version: {version} asset_name: {asset_name} repo_name: {repo_name}") + logging.error(message) + return result_run, message, data + + data.append(dict(version=version, asset=specific_asset)) + + result_run = True + + except: + message_run = f'Unexcepted error: {str(traceback.format_exc())}' + logging.error(message_run) + + return result_run, message_run, data \ No newline at end of file diff --git a/setup.py b/setup.py index 58c4234..f22792f 100644 --- a/setup.py +++ b/setup.py @@ -9,10 +9,19 @@ 'License :: OSI Approved :: MIT License', 'Programming Language :: Python :: 3' ] + +keywords = ['chromedriver', 'operadriver', + 'edgedriver', 'safaridriver', + 'selenium', 'seleniumdriver', + 'chromedriver-binary', 'selenium-binary', + 'selenium-python', 'selenium-driver', + 'geckodriver', 'geckodriver-binary', + 'operadriver-binary', 'edgedriver-binary', + 'safaridriver-binary'] setup( name='selenium_driver_updater', - version='1.7.1', + version='1.8.0', description='Download or update your Selenium driver binaries automatically with this package', long_description=long_description, long_description_content_type='text/markdown', @@ -21,7 +30,7 @@ author_email='stasvinokur@yahoo.com', license='MIT', classifiers=classifiers, - keywords=['chromedriver', 'operadriver', 'edgedriver', 'safaridriver', 'selenium', 'seleniumdriver', 'chromedriver-binary', 'selenium-binary', 'selenium-python'], + keywords=keywords, packages=['selenium_driver_updater'], install_requires=['wget', 'requests', 'selenium', 'beautifulsoup4', 'lxml', 'msedge-selenium-tools'] ) \ No newline at end of file