From 687c9eb6a607402dc0a009fdf435e1750c79ca85 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Brunner?= Date: Thu, 27 Feb 2025 14:16:44 +0100 Subject: [PATCH] Fix new Prospector issues --- .pre-commit-config.yaml | 6 +-- app/shared_config_manager/security.py | 19 +++---- app/shared_config_manager/services.py | 54 +++++++++++-------- app/shared_config_manager/sources/base.py | 41 +++++++------- app/shared_config_manager/sources/git.py | 29 +++++----- app/shared_config_manager/sources/rclone.py | 22 ++++---- app/shared_config_manager/sources/registry.py | 19 +++---- app/shared_config_manager/sources/ssh.py | 40 +++++--------- .../template_engines/base.py | 25 ++++----- .../template_engines/mako.py | 6 ++- .../template_engines/shell.py | 5 +- app/shared_config_manager/views.py | 15 +++--- app/tests/template_engines/test_shell.py | 17 +++--- 13 files changed, 151 insertions(+), 147 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index db3d45a2..dbf920ad 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -65,7 +65,7 @@ repos: args: - --line-length=110 - repo: https://github.com/PyCQA/prospector - rev: v1.15.0 + rev: v1.15.1 hooks: - id: prospector args: @@ -75,7 +75,7 @@ repos: - --profile=app/.prospector.yaml additional_dependencies: - prospector-profile-duplicated==1.10.4 # pypi - - prospector-profile-utils==1.21.4 # pypi + - prospector-profile-utils==1.21.6 # pypi - id: prospector args: - --die-on-tool-error @@ -83,7 +83,7 @@ repos: - --profile=utils:tests - --profile=utils:pre-commit additional_dependencies: - - prospector-profile-utils==1.21.4 # pypi + - prospector-profile-utils==1.21.6 # pypi - repo: https://github.com/sbrunner/jsonschema-validator rev: 1.0.0 hooks: diff --git a/app/shared_config_manager/security.py b/app/shared_config_manager/security.py index 70503956..7fd0d21d 100644 --- a/app/shared_config_manager/security.py +++ b/app/shared_config_manager/security.py @@ -27,13 +27,14 @@ class User: def __init__( self, auth_type: str, - login: str | None, - name: str | None, - url: str | None, - is_auth: bool, - token: str | None, - request: pyramid.request.Request, + login: str | None = None, + name: str | None = None, + url: str | None = None, + is_auth: bool = False, + token: str | None = None, + request: pyramid.request.Request | None = None, ) -> None: + assert request is not None self.auth_type = auth_type self.login = login self.name = name @@ -82,7 +83,7 @@ def identity(self, request: pyramid.request.Request) -> User: our_signature, request.headers["X-Hub-Signature-256"].split("=", 1)[1], ): - user = User("github_webhook", None, None, None, True, None, request) + user = User("github_webhook", is_auth=True, request=request) else: _LOG.warning("Invalid GitHub signature") _LOG.debug( @@ -101,7 +102,7 @@ def identity(self, request: pyramid.request.Request) -> User: elif "X-Scm-Secret" in request.headers and "SCM_SECRET" in os.environ: if request.headers["X-Scm-Secret"] == os.environ["SCM_SECRET"]: - user = User("scm_internal", None, None, None, True, None, request) + user = User("scm_internal", is_auth=True, request=request) else: _LOG.warning("Invalid SCM secret") @@ -120,7 +121,7 @@ def identity(self, request: pyramid.request.Request) -> User: request.user = user - return request.user # type: ignore + return request.user # type: ignore[misc] def authenticated_userid(self, request: pyramid.request.Request) -> str | None: """Return a string ID for the user.""" diff --git a/app/shared_config_manager/services.py b/app/shared_config_manager/services.py index ce7bf16f..4ce2d540 100644 --- a/app/shared_config_manager/services.py +++ b/app/shared_config_manager/services.py @@ -25,24 +25,27 @@ __BRANCH_NAME_SANITIZER = re.compile(r"[^0-9a-zA-z-_]") -@_refresh_service.get() # type: ignore +@_refresh_service.get() # type: ignore[misc] def _refresh_view(request: pyramid.request.Request) -> dict[str, Any]: id_ = request.matchdict["id"] source, _ = registry.get_source_check_auth(id_=id_, request=request) if source is None: - raise HTTPNotFound(f"Unknown id {id_}") + message = f"Unknown id {id_}" + raise HTTPNotFound(message) return _refresh(request) -@_refresh_service.post() # type: ignore +@_refresh_service.post() # type: ignore[misc] def _refresh_webhook(request: pyramid.request.Request) -> dict[str, Any]: id_ = request.matchdict["id"] source, _ = registry.get_source_check_auth(id_=id_, request=request) if source is None: - raise HTTPNotFound(f"Unknown id {id_}") + message = f"Unknown id {id_}" + raise HTTPNotFound(message) if source.get_type() != "git": - raise HTTPServerError(f"Non GIT source {id_} cannot be refreshed by a webhook") + message = f"Non GIT source {id_} cannot be refreshed by a webhook" + raise HTTPServerError(message) source_git = cast(git.GitSource, source) @@ -52,7 +55,8 @@ def _refresh_webhook(request: pyramid.request.Request) -> dict[str, Any]: ref = request.json.get("ref") if ref is None: - raise HTTPServerError(f"Webhook for {id_} is missing the ref") + message = "Webhook for {id_} is missing the ref" + raise HTTPServerError(message) if ref != "refs/heads/" + source_git.get_branch(): _LOG.info( "Ignoring webhook notif for non-matching branch %s on %s", @@ -70,10 +74,11 @@ def _refresh(request: pyramid.request.Request) -> dict[str, Any]: return {"status": 200} -@_refresh_all_service.get() # type: ignore +@_refresh_all_service.get() # type: ignore[misc] def _refresh_all(request: pyramid.request.Request) -> dict[str, Any]: if not registry.MASTER_SOURCE: - raise HTTPServerError("Master source not initialized") + message = "Master source not initialized" + raise HTTPServerError(message) registry.MASTER_SOURCE.validate_auth(request) nb_refresh = 0 for id_ in registry.get_sources(): @@ -82,10 +87,11 @@ def _refresh_all(request: pyramid.request.Request) -> dict[str, Any]: return {"status": 200, "nb_refresh": nb_refresh} -@_refresh_all_service.post() # type: ignore +@_refresh_all_service.post() # type: ignore[misc] def _refresh_all_webhook(request: pyramid.request.Request) -> dict[str, Any]: if not registry.MASTER_SOURCE: - raise HTTPServerError("Master source not initialized") + message = "Master source not initialized" + raise HTTPServerError(message) registry.MASTER_SOURCE.validate_auth(request=request) if request.headers.get("X-GitHub-Event") != "push": @@ -94,7 +100,8 @@ def _refresh_all_webhook(request: pyramid.request.Request) -> dict[str, Any]: ref = request.json.get("ref") if ref is None: - raise HTTPServerError("Webhook is missing the ref") + message = "Webhook is missing the ref" + raise HTTPServerError(message) nb_refresh = 0 for id_, source in registry.get_sources().items(): @@ -117,7 +124,7 @@ def _refresh_all_webhook(request: pyramid.request.Request) -> dict[str, Any]: return {"status": 200, "nb_refresh": nb_refresh} -@_status_service.get() # type: ignore +@_status_service.get() # type: ignore[misc] def _stats(request: pyramid.request.Request) -> dict[str, Any]: if not registry.MASTER_SOURCE: return {"slaves": {}} @@ -128,12 +135,13 @@ def _stats(request: pyramid.request.Request) -> dict[str, Any]: return {"slaves": slaves} -@_source_stats_service.get() # type: ignore +@_source_stats_service.get() # type: ignore[misc] def _source_stats(request: pyramid.request.Request) -> dict[str, Any]: id_ = request.matchdict["id"] source, _ = registry.get_source_check_auth(id_=id_, request=request) if source is None: - raise HTTPNotFound(f"Unknown id {id_}") + message = f"Unknown id {id_}" + raise HTTPNotFound(message) slaves: list[SourceStatus] | None = slave_status.get_source_status(id_=id_) assert slaves is not None statuses: list[SourceStatus] = [] @@ -154,20 +162,23 @@ def _cleanup_slave_status(status: BroadcastObject) -> BroadcastObject: return result -@_tarball_service.get() # type: ignore +@_tarball_service.get() # type: ignore[m] def _tarball(request: pyramid.request.Request) -> pyramid.response.Response: id_ = request.matchdict["id"] source, filtered = registry.get_source_check_auth(id_=id_, request=request) if source is None: - raise HTTPNotFound(f"Unknown id {id_}") + message = f"Unknown id {id_}" + raise HTTPNotFound(message) if not source.is_loaded(): - raise HTTPNotFound("Not loaded yet") + message = "Not loaded yet" + raise HTTPNotFound(message) assert not filtered path = source.get_path() - if not os.path.isdir(path): + if not path.is_dir(): _LOG.error("The path %s does not exists or is not a path, for the source %s.", path, source.get_id()) - raise HTTPNotFound("Not loaded yet: path didn't exists") + message = "Not loaded yet: path didn't exists" + raise HTTPNotFound(message) response: pyramid.response.Response = request.response @@ -190,9 +201,10 @@ def _tarball(request: pyramid.request.Request) -> pyramid.response.Response: def _proc_iter(proc: subprocess.Popen[bytes]) -> Iterable[bytes | Any]: while True: - block = proc.stdout.read(4096) # type: ignore + block = proc.stdout.read(4096) # type: ignore[m] if not block: break yield block if proc.wait() != 0: - raise HTTPServerError("Error building the tarball") + message = "Error building the tarball" + raise HTTPServerError(message) diff --git a/app/shared_config_manager/sources/base.py b/app/shared_config_manager/sources/base.py index 2d92f32e..3b82bc88 100644 --- a/app/shared_config_manager/sources/base.py +++ b/app/shared_config_manager/sources/base.py @@ -1,10 +1,10 @@ import copy import logging import os -import pathlib import shutil import subprocess # nosec import time +from pathlib import Path from typing import Any, cast import pyramid.request @@ -19,8 +19,8 @@ from shared_config_manager.sources import mode _LOG = logging.getLogger(__name__) -_TARGET = os.environ.get("TARGET", "/config") -_MASTER_TARGET = os.environ.get("MASTER_TARGET", "/master_config") +_TARGET = Path(os.environ.get("TARGET", "/config")) +_MASTER_TARGET = Path(os.environ.get("MASTER_TARGET", "/master_config")) _RETRY_NUMBER = int(os.environ.get("SCM_RETRY_NUMBER", 3)) _RETRY_DELAY = int(os.environ.get("SCM_RETRY_DELAY", 1)) @@ -99,7 +99,7 @@ def _eval_templates(self) -> None: # all the files that are created by template engines (see the --delete rsync flag in # BaseSource._copy). root_dir = self.get_path() - files = [os.path.relpath(str(p), root_dir) for p in pathlib.Path(root_dir).glob("**/*")] + files = [os.path.relpath(str(p), root_dir) for p in root_dir.glob("**/*")] for engine in self._template_engines: with _TEMPLATE_SUMMARY.labels(self.get_id(), engine.get_type()).time(): @@ -134,9 +134,9 @@ def _do_fetch(self) -> None: _LOG.info("Doing a fetch of %s", self.get_id()) response = requests.get(url, headers={"X-Scm-Secret": os.environ["SCM_SECRET"]}, stream=True) response.raise_for_status() - if os.path.exists(path): + if path.exists(): shutil.rmtree(path) - os.makedirs(path, exist_ok=True) + path.mkdir(parent=True, exist_ok=True) with subprocess.Popen( # nosec [ "tar", @@ -150,11 +150,10 @@ def _do_fetch(self) -> None: cwd=path, stdin=subprocess.PIPE, ) as tar: - shutil.copyfileobj(response.raw, tar.stdin) # type: ignore - tar.stdin.close() # type: ignore + shutil.copyfileobj(response.raw, tar.stdin) # type: ignore[m] + tar.stdin.close() # type: ignore[m] assert tar.wait() == 0 - return - except Exception as exception: # pylint: disable=broad-exception-caught + except Exception as exception: # pylint: disable=broad-exception-caught # noqa: PERF203 _DO_FETCH_ERROR_COUNTER.labels(self.get_id()).inc() retry_message = f" (will retry in {_RETRY_DELAY}s)" if i else " (failed)" _LOG.warning( @@ -167,9 +166,11 @@ def _do_fetch(self) -> None: time.sleep(_RETRY_DELAY) else: raise + else: + return def _copy(self, source: str, excludes: list[str] | None = None) -> None: - os.makedirs(self.get_path(), exist_ok=True) + self.get_path().mkdir(parents=True, exist_ok=True) cmd = [ "rsync", "--recursive", @@ -184,23 +185,23 @@ def _copy(self, source: str, excludes: list[str] | None = None) -> None: cmd += ["--exclude=" + exclude for exclude in excludes] if "excludes" in self._config: cmd += ["--exclude=" + exclude for exclude in self._config["excludes"]] - cmd += [source + "/", self.get_path()] + cmd += [source + "/", str(self.get_path())] with _COPY_SUMMARY.labels(self.get_id()).time(): self._exec(*cmd) def delete_target_dir(self) -> None: dest = self.get_path() _LOG.info("Deleting target dir %s", dest) - if os.path.isdir(dest): + if dest.is_dir(): shutil.rmtree(dest) - def get_path(self) -> str: + def get_path(self) -> Path: if "target_dir" in self._config: target_dir = self._config["target_dir"] if target_dir.startswith("/"): - return target_dir - return _MASTER_TARGET if self._is_master else os.path.join(_TARGET, target_dir) - return _MASTER_TARGET if self._is_master else os.path.join(_TARGET, self.get_id()) + return Path(target_dir) + return _MASTER_TARGET if self._is_master else _TARGET / target_dir + return _MASTER_TARGET if self._is_master else _TARGET / self.get_id() def get_id(self) -> str: return self._id @@ -208,7 +209,8 @@ def get_id(self) -> str: def validate_auth(self, request: pyramid.request.Request) -> None: permission = request.has_permission("all", self.get_config()) if not isinstance(permission, Allowed): - raise HTTPForbidden("Not allowed to access this source") + message = "Not allowed to access this source" + raise HTTPForbidden(message) def is_master(self) -> bool: return self._is_master @@ -255,10 +257,11 @@ def _exec(*args: Any, **kwargs: Any) -> str: ) if output: _LOG.debug(output) - return output except subprocess.CalledProcessError as exception: _LOG.error(exception.output.decode("utf-8").strip()) raise + else: + return output def is_loaded(self) -> bool: return self._is_loaded diff --git a/app/shared_config_manager/sources/git.py b/app/shared_config_manager/sources/git.py index d8f551c4..d7e775c1 100644 --- a/app/shared_config_manager/sources/git.py +++ b/app/shared_config_manager/sources/git.py @@ -1,7 +1,6 @@ import base64 import json import logging -import os import subprocess import tempfile from pathlib import Path @@ -10,7 +9,7 @@ from shared_config_manager.sources import mode from shared_config_manager.sources.ssh import SshBaseSource -TEMP_DIR = tempfile.gettempdir() +TEMP_DIR = Path(tempfile.gettempdir()) LOG = logging.getLogger(__name__) @@ -21,14 +20,14 @@ def _do_refresh(self) -> None: self._checkout() self._copy(self._copy_dir(), excludes=[".git"]) stats = {"hash": self._get_hash(), "tags": self._get_tags()} - with open(os.path.join(self.get_path(), ".gitstats"), "w", encoding="utf-8") as gitstats: + with (self.get_path() / ".gitstats").open("w", encoding="utf-8") as gitstats: json.dump(stats, gitstats) def _checkout(self) -> None: cwd = self._clone_dir() repo = self._get_repo() branch = self.get_branch() - git_dir = Path(cwd) / ".git" + git_dir = cwd / ".git" if git_dir.is_dir(): LOG.info("Fetching a new version of %s", repo) try: @@ -44,22 +43,22 @@ def _checkout(self) -> None: self._exec("git-sparse-clone", repo, branch, cwd, self._config["sub_dir"], cwd=TEMP_DIR) else: LOG.info("Cloning %s", repo) - if os.path.exists(cwd): - os.removedirs(cwd) - os.makedirs(os.path.dirname(cwd), exist_ok=True) - command = ["git", "clone", f"--branch={branch}", "--depth=1", repo, os.path.basename(cwd)] + if cwd.exists(): + cwd.removedirs() + cwd.parent.mkdir(parent=True, exist_ok=True) + command = ["git", "clone", f"--branch={branch}", "--depth=1", repo, str(cwd.parent)] self._exec(*command, cwd=TEMP_DIR) def _get_repo(self) -> str: return self._config["repo"] - def _clone_dir(self) -> str: + def _clone_dir(self) -> Path: if self._do_sparse(): - return os.path.join(TEMP_DIR, self.get_id()) + return TEMP_DIR / self.get_id() # The directory we clone into is not fct(id), but in function of the repository and the # branch. That way, if two sources are other sub-dirs of the same repo, we clone it only once. encoded_repo = base64.urlsafe_b64encode(self._get_repo().encode("utf-8")).decode("utf-8") - return os.path.join(TEMP_DIR, encoded_repo) + return TEMP_DIR / encoded_repo def _do_sparse(self) -> bool: return "sub_dir" in self._config and self._config.get("sparse", True) @@ -68,13 +67,13 @@ def _copy_dir(self) -> str: sub_dir = self._config.get("sub_dir") if sub_dir is None: return self._clone_dir() - return os.path.join(self._clone_dir(), sub_dir) + return self._clone_dir() / sub_dir def get_stats(self) -> SourceStatus: stats = super().get_stats() - stats_path = os.path.join(self.get_path(), ".gitstats") - if os.path.isfile(stats_path): - with open(stats_path, encoding="utf-8") as gitstats: + stats_path = self.get_path() / ".gitstats" + if stats_path.isfile(): + with stats_path.open(encoding="utf-8") as gitstats: stats.update(json.load(gitstats)) return stats diff --git a/app/shared_config_manager/sources/rclone.py b/app/shared_config_manager/sources/rclone.py index 367aedf4..5d5a8911 100644 --- a/app/shared_config_manager/sources/rclone.py +++ b/app/shared_config_manager/sources/rclone.py @@ -1,5 +1,5 @@ -import os import re +from pathlib import Path from shared_config_manager.configuration import SourceConfig, SourceStatus from shared_config_manager.sources.base import BaseSource @@ -13,25 +13,25 @@ def __init__(self, id_: str, config: SourceConfig, is_master: bool) -> None: self._setup_config(config["config"]) def _do_refresh(self) -> None: - was_here = os.path.isdir(self.get_path()) - target = self.get_path() + ("" if was_here else ".tmp") - os.makedirs(target, exist_ok=True) - cmd = ["rclone", "sync", "--verbose", "--config", self._config_path()] + was_here = self.get_path().is_dir() + target = self.get_path() if was_here else self.get_path().with_suffix(".tmp") + target.mkdir(parents=True, exist_ok=True) + cmd = ["rclone", "sync", "--verbose", "--config", str(self._config_path())] if "excludes" in self._config: cmd += ["--exclude=" + exclude for exclude in self._config["excludes"]] - cmd += ["remote:" + self.get_config().get("sub_dir", ""), target] + cmd += ["remote:" + self.get_config().get("sub_dir", ""), str(target)] self._exec(*cmd) if not was_here: - os.rename(target, self.get_path()) + target.rename(self.get_path()) - def _config_path(self) -> str: - return os.path.join(os.environ["HOME"], ".config", "rclone", self.get_id() + ".conf") + def _config_path(self) -> Path: + return Path.home() / ".config" / "rclone" / f"{self.get_id()}.conf" def _setup_config(self, config: str) -> None: path = self._config_path() - os.makedirs(os.path.dirname(path), exist_ok=True) - with open(path, "w", encoding="utf-8") as file_: + path.parent.mkdir(parents=True, exist_ok=True) + with path.open("w", encoding="utf-8") as file_: file_.write("[remote]\n") file_.write(config) diff --git a/app/shared_config_manager/sources/registry.py b/app/shared_config_manager/sources/registry.py index 1edf0b78..7b8677ba 100644 --- a/app/shared_config_manager/sources/registry.py +++ b/app/shared_config_manager/sources/registry.py @@ -1,9 +1,9 @@ import logging import os -import pathlib import subprocess # nosec import tempfile from collections.abc import Mapping +from pathlib import Path from threading import Thread import inotify.adapters @@ -49,7 +49,7 @@ def init(slave: bool) -> None: content = yaml.load(os.environ["MASTER_CONFIG"], Loader=yaml.SafeLoader) else: _LOG.info("Load the master config from config file") - with open("/etc/shared_config_manager/config.yaml", encoding="utf-8") as scm_file: + with Path("/etc/shared_config_manager/config.yaml").open(encoding="utf-8") as scm_file: content = yaml.load(scm_file, Loader=yaml.SafeLoader) def thread() -> None: @@ -61,7 +61,7 @@ def thread() -> None: "IN_CLOSE_WRITE" in type_names or "IN_IGNORED" in type_names ): # the IN_IGNORED it for move file on this one _LOG.info("Reload the master config from config file") - with open("/etc/shared_config_manager/config.yaml", encoding="utf-8") as scm_file: + with Path("/etc/shared_config_manager/config.yaml").open(encoding="utf-8") as scm_file: config = yaml.load(scm_file, Loader=yaml.SafeLoader) _handle_master_config(config) @@ -86,8 +86,7 @@ def thread() -> None: def reload_master_config() -> None: """Reload the master config.""" if MASTER_SOURCE: - with open( - os.path.join(MASTER_SOURCE.get_path(), "shared_config_manager.yaml"), + with (MASTER_SOURCE.get_path() / "shared_config_manager.yaml").open( encoding="utf-8", ) as config_file: config = yaml.load(config_file, Loader=yaml.SafeLoader) @@ -130,7 +129,8 @@ def _do_handle_master_config(config: Config) -> tuple[int, int]: def _handle_master_config(config: Config) -> None: _LOG.info("Reading the master config") if _MASTER_ID in config["sources"]: - raise HTTPBadRequest(f'A source cannot have the "{_MASTER_ID}" id') + message = f"A source cannot have the {_MASTER_ID} id" + raise HTTPBadRequest(message) success, errors = _do_handle_master_config(config) if errors != 0: if success != 0: @@ -143,12 +143,12 @@ def _handle_master_config(config: Config) -> None: def _update_flag(value: str) -> None: - with open(os.path.join(tempfile.gettempdir(), "status"), "w", encoding="utf-8") as flag: + with (Path(tempfile.gettempdir()) / "status").open("w", encoding="utf-8") as flag: flag.write(value) def _prepare_ssh() -> None: - home = pathlib.Path.home() + home = Path.home() other_ssh = home.joinpath(".ssh2") if other_ssh.is_dir(): ssh = home.joinpath(".ssh") @@ -193,7 +193,8 @@ def refresh(id_: str, request: pyramid.request.Request | None) -> None: _LOG.info("Reloading the %s config", id_) source, _ = get_source_check_auth(id_, request) if source is None: - raise HTTPNotFound(f"Unknown id {id_}") + message = f"Unknown id {id_}" + raise HTTPNotFound(message) source.refresh() if source.is_master() and (not MASTER_SOURCE or not MASTER_SOURCE.get_config().get("standalone", False)): reload_master_config() diff --git a/app/shared_config_manager/sources/ssh.py b/app/shared_config_manager/sources/ssh.py index 0e507f47..87d8130e 100644 --- a/app/shared_config_manager/sources/ssh.py +++ b/app/shared_config_manager/sources/ssh.py @@ -1,24 +1,11 @@ import fileinput import os +from pathlib import Path from shared_config_manager.configuration import SourceConfig, SourceStatus from shared_config_manager.sources.base import BaseSource -def _patch_openshift() -> None: - os.environ["HOME"] = "/var/www" - try: - with open("/etc/passwd", "a", encoding="utf-8") as passwd: - passwd.write(f"www-data2:x:{os.getuid()}:0:www-data:/var/www:/usr/sbin/nologin\n") - except PermissionError: - pass # ignored - - -# hack to work around an OpenShift "security" -if os.getuid() not in (33, 0): - _patch_openshift() - - class SshBaseSource(BaseSource): """Source to get files from SSH server.""" @@ -30,20 +17,19 @@ def _setup_key(self, ssh_key: str | None) -> None: if ssh_key is None: return ssh_path = self._ssh_path() - os.makedirs(ssh_path, exist_ok=True) - key_path = os.path.join(ssh_path, self.get_id()) + ".key" - was_here = os.path.isfile(key_path) - with open(key_path, "w", encoding="utf-8") as ssh_key_file: - ssh_key_file.write(ssh_key) - os.chmod(key_path, 0o700) + ssh_path.mkdir(parents=True, exist_ok=True) + key_path = ssh_path / f"{self.get_id()}.key" + was_here = key_path.is_file() + key_path.write_text(ssh_key, encoding="utf-8") + key_path.chmod(0o700) if not was_here: - with open(os.path.join(ssh_path, "config"), "a", encoding="utf-8") as config: + with (ssh_path / "config").open("a", encoding="utf-8") as config: config.write(f"IdentityFile {key_path}\n") @staticmethod - def _ssh_path() -> str: - return os.path.join(os.environ["HOME"], ".ssh") + def _ssh_path() -> Path: + return Path(os.environ["HOME"]) / ".ssh" def get_stats(self) -> SourceStatus: stats = super().get_stats() @@ -56,10 +42,10 @@ def delete(self) -> None: ssh_key = self._config.get("ssh_key") if ssh_key is not None: ssh_path = self._ssh_path() - key_path = os.path.join(ssh_path, self.get_id()) + ".key" - if os.path.isfile(key_path): - os.remove(key_path) - with fileinput.input(os.path.join(ssh_path, "config"), inplace=True) as config: + key_path = ssh_path / self.get_id() + ".key" + if key_path.isfile(): + key_path.unlink() + with fileinput.input(ssh_path / "config", inplace=True) as config: for line in config: if line != f"IdentityFile {key_path}\n": print(line, end="") diff --git a/app/shared_config_manager/template_engines/base.py b/app/shared_config_manager/template_engines/base.py index 3d2f0b8a..9918b691 100644 --- a/app/shared_config_manager/template_engines/base.py +++ b/app/shared_config_manager/template_engines/base.py @@ -1,5 +1,6 @@ import logging import os +from pathlib import Path from typing import cast from prometheus_client import Counter, Gauge @@ -32,7 +33,7 @@ def __init__(self, source_id: str, config: TemplateEnginesConfig, extension: str else: self._data = config.get("data", {}) - def evaluate(self, root_dir: str, files: list[str]) -> None: + def evaluate(self, root_dir: Path, files: list[Path]) -> None: extension_len = len(self._extension) + 1 dest_dir = self._get_dest_dir(root_dir) _LOG.info( @@ -43,10 +44,10 @@ def evaluate(self, root_dir: str, files: list[str]) -> None: ) for sub_path in files: - src_path = os.path.join(root_dir, sub_path) - dest_path = os.path.join(dest_dir, sub_path) - os.makedirs(os.path.dirname(dest_path), exist_ok=True) - if src_path.endswith("." + self._extension): + src_path = root_dir / sub_path + dest_path = dest_dir / sub_path + dest_path.parent.mkdir(parents=True, exist_ok=True) + if src_path.suffix == "." + self._extension: dest_path = dest_path[:-extension_len] _LOG.debug("Evaluating template: %s -> %s", src_path, dest_path) try: @@ -61,15 +62,15 @@ def evaluate(self, root_dir: str, files: list[str]) -> None: ) _ERROR_COUNTER.labels(source=self._source_id, type=self.get_type()).inc() _ERROR_GAUGE.labels(source=self._source_id, type=self.get_type()).set(1) - elif src_path != dest_path and not os.path.isdir(src_path) and not os.path.exists(dest_path): + elif src_path != dest_path and not src_path.is_dir() and not dest_path.exists(): os.link(src_path, dest_path) - def _get_dest_dir(self, root_dir: str) -> str: + def _get_dest_dir(self, root_dir: Path) -> Path: if "dest_sub_dir" in self._config: - return os.path.join(root_dir, self._config["dest_sub_dir"]) + return root_dir / self._config["dest_sub_dir"] return root_dir - def _evaluate_file(self, src_path: str, dst_path: str) -> None: + def _evaluate_file(self, src_path: Path, dst_path: Path) -> None: pass def get_type(self) -> str: @@ -81,8 +82,4 @@ def get_stats(self, stats: TemplateEnginesStatus) -> None: def _filter_env(env: dict[str, str]) -> dict[str, str]: - result = {} - for key, value in env.items(): - if any(key.startswith(i) for i in _ENV_PREFIXES): - result[key] = value - return result + return {key: value for key, value in env.items() if any(key.startswith(i) for i in _ENV_PREFIXES)} diff --git a/app/shared_config_manager/template_engines/mako.py b/app/shared_config_manager/template_engines/mako.py index b8eeef76..4b0da43f 100644 --- a/app/shared_config_manager/template_engines/mako.py +++ b/app/shared_config_manager/template_engines/mako.py @@ -1,3 +1,5 @@ +from pathlib import Path + import mako.template from shared_config_manager.configuration import TemplateEnginesConfig @@ -10,7 +12,7 @@ class MakoEngine(BaseEngine): def __init__(self, source_id: str, config: TemplateEnginesConfig) -> None: super().__init__(source_id, config, "mako") - def _evaluate_file(self, src_path: str, dst_path: str) -> None: + def _evaluate_file(self, src_path: Path, dst_path: Path) -> None: template = mako.template.Template(filename=src_path) # nosec # noqa: S702 - with open(dst_path, "w", encoding="utf-8") as output: + with dst_path.open("w", encoding="utf-8") as output: output.write(template.render(**self._data)) diff --git a/app/shared_config_manager/template_engines/shell.py b/app/shared_config_manager/template_engines/shell.py index e996c380..71409f34 100644 --- a/app/shared_config_manager/template_engines/shell.py +++ b/app/shared_config_manager/template_engines/shell.py @@ -1,4 +1,5 @@ import subprocess # nosec +from pathlib import Path from shared_config_manager.configuration import TemplateEnginesConfig from shared_config_manager.template_engines.base import BaseEngine @@ -10,6 +11,6 @@ class ShellEngine(BaseEngine): def __init__(self, source_id: str, config: TemplateEnginesConfig) -> None: super().__init__(source_id, config, "tmpl") - def _evaluate_file(self, src_path: str, dst_path: str) -> None: - with open(src_path, encoding="utf-8") as input_, open(dst_path, "w", encoding="utf-8") as output: + def _evaluate_file(self, src_path: Path, dst_path: Path) -> None: + with src_path.open(encoding="utf-8") as input_, dst_path.open("w", encoding="utf-8") as output: subprocess.run(["envsubst"], stdin=input_, stdout=output, env=self._data, check=True) # nosec diff --git a/app/shared_config_manager/views.py b/app/shared_config_manager/views.py index bf4fcb83..8ad5e3a6 100644 --- a/app/shared_config_manager/views.py +++ b/app/shared_config_manager/views.py @@ -3,6 +3,7 @@ import os.path import re import subprocess # nosec +from pathlib import Path from typing import Any, cast import pyramid.request @@ -44,7 +45,7 @@ def _is_valid(source: BaseSource) -> bool: return True -@view_config(route_name="ui_index", renderer="./templates/index.html.mako") # type: ignore +@view_config(route_name="ui_index", renderer="./templates/index.html.mako") # type: ignore[misc] def _ui_index(request: pyramid.request.Request) -> dict[str, Any]: permission = request.has_permission("all", {}) is_admin = isinstance(permission, Allowed) @@ -63,7 +64,7 @@ def _ui_index(request: pyramid.request.Request) -> dict[str, Any]: return {"sources": sources_list, "is_valid": _is_valid} -@view_config(route_name="ui_source", renderer="./templates/source.html.mako") # type: ignore +@view_config(route_name="ui_source", renderer="./templates/source.html.mako") # type: ignore[misc] def _ui_source(request: pyramid.request.Request) -> dict[str, Any]: def key_format(key: str) -> str: return key[0].upper() + key[1:].replace("_", " ") @@ -74,11 +75,13 @@ def key_format(key: str) -> str: id_ = request.matchdict["id"] source, filtered = registry.get_source_check_auth(id_=id_, request=request) if source is None: - raise HTTPNotFound(f"Unknown id {id_} or forbidden") + message = f"Unknown id {id_} or forbidden" + raise HTTPNotFound(message) if not is_admin: permission = request.has_permission(id_, source.get_config()) if not isinstance(permission, Allowed): - raise HTTPNotFound(f"Unknown id {id_} or forbidden") + message = f"Unknown id {id_} or forbidden" + raise HTTPNotFound(message) slaves = slave_status.get_source_status(id_=id_) assert slaves is not None @@ -155,7 +158,7 @@ def key_format(key: str) -> str: commit_details = ( subprocess.run( # type: ignore[assignment] # nosec ["git", "show", "--quiet", slave["hash"]], - cwd=os.path.join("/repos", source.get_id()), + cwd=str(Path("/repos") / source.get_id()), check=True, stdout=subprocess.PIPE, ) @@ -163,7 +166,7 @@ def key_format(key: str) -> str: .split("\n") ) _slave_status.append((slave, commit_details)) - except Exception: # pylint: disable=broad-exception-caught + except Exception: # pylint: disable=broad-exception-caught # noqa: PERF203 _LOG.warning("Unable to get the commit status for %s", slave.get("hash"), exc_info=True) _slave_status.append((slave, [])) diff --git a/app/tests/template_engines/test_shell.py b/app/tests/template_engines/test_shell.py index 05aef3cf..8b6fa721 100644 --- a/app/tests/template_engines/test_shell.py +++ b/app/tests/template_engines/test_shell.py @@ -1,5 +1,4 @@ import os -import pathlib from shared_config_manager import template_engines @@ -11,11 +10,11 @@ def test_ok(temp_dir) -> None: {"type": "shell", "environment_variables": True, "data": {"param": "world"}}, ) - file_path = os.path.join(temp_dir, "file1") + file_path = temp_dir / "file1" with open(file_path + ".tmpl", "w") as out: out.write("Hello ${param} ${MUTUALIZED_TEST_ENV}\n") - files = [os.path.relpath(str(p), temp_dir) for p in pathlib.Path(temp_dir).glob("**/*")] + files = [p.relpath(temp_dir) for p in temp_dir.glob("**/*")] engine.evaluate(temp_dir, files) with open(file_path) as input: @@ -29,18 +28,18 @@ def test_dest_sub_dir(temp_dir) -> None: {"type": "shell", "dest_sub_dir": "copy", "environment_variables": True, "data": {"param": "world"}}, ) - file_path = os.path.join(temp_dir, "file1") + file_path = temp_dir / "file1" with open(file_path + ".tmpl", "w") as out: out.write("Hello ${param} ${MUTUALIZED_TEST_ENV}\n") - with open(os.path.join(temp_dir, "file2"), "w") as out: + with (temp_dir / "file2").open("w") as out: out.write("Hello\n") - files = [os.path.relpath(str(p), temp_dir) for p in pathlib.Path(temp_dir).glob("**/*")] + files = [p.relpath(temp_dir) for p in temp_dir.glob("**/*")] engine.evaluate(temp_dir, files) - with open(os.path.join(temp_dir, "copy", "file1")) as input: + with open(temp_dir / "copy", "file1") as input: assert input.read() == "Hello world yall\n" - with open(os.path.join(temp_dir, "copy", "file2")) as input: + with open(temp_dir / "copy" / "file2") as input: assert input.read() == "Hello\n" - assert not os.path.exists(os.path.join(temp_dir, "copy", "copy")) + assert not (temp_dir / "copy" / "copy").exists()