From 9f51b87022f3f2898df6420399a79e96c6e8dc9c Mon Sep 17 00:00:00 2001 From: Dylan Pulver Date: Mon, 25 Nov 2024 13:54:11 -0500 Subject: [PATCH] parent 223ad60fd8ee5f78becb4b06a2aea80b74a2febd author Dylan Pulver 1732560851 -0500 committer Dylan Pulver 1734466348 -0500 gpgsig -----BEGIN PGP SIGNATURE----- iHUEABYKAB0WIQR8hu+aMQHwGtOiprRYOGlsgKaxswUCZ2HbLAAKCRBYOGlsgKax s+jrAP97O2+K0k+c7YMwn0JuN9CCAKXSuOo+6e58xt2aThUWoQEA0B00lQhBAZVh qcZOk9sMdVXMHl308FNXDEWbCdFT6Qk= =4jd3 -----END PGP SIGNATURE----- feature/post-prototype feature/add-branch-name (#641) chore:Use specific safety schema version feature/cve-data-filter-flag (#643) chore/release-3.2.12 (#644) feat(utils.py): remove email verification feat(changelog): update version Auth added fix urljoin issue chore:Use specific safety schema version feat(utils.py): remove email verification feat(changelog): update version review changes switch to target drop prefixes fix return code check extract runtime info fix --- .github/workflows/main.yml | 2 +- .vscode/launch.json | 10 +++ CHANGELOG.md | 8 ++ safety/VERSION | 2 +- safety/auth/utils.py | 43 ++++++++-- safety/cli_util.py | 22 ++++- safety/scan/command.py | 172 ++++++++++++++++++++++++++++++++----- safety/scan/main.py | 111 +++++++++++++++++++++--- setup.cfg | 2 +- test_requirements.txt | 2 +- tests/auth/test_cli.py | 26 +++--- tests/scan/test_command.py | 4 +- 12 files changed, 348 insertions(+), 56 deletions(-) diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 405a1d4f..fe705e74 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -7,7 +7,7 @@ jobs: runs-on: ubuntu-20.04 strategy: matrix: - python-version: [ "3.7", "3.8", "3.9", "3.10", "3.11", "3.12" ] + python-version: [ "3.8", "3.9", "3.10", "3.11", "3.12" ] steps: - uses: actions/checkout@v3 - name: Set up Python diff --git a/.vscode/launch.json b/.vscode/launch.json index 8ce561ec..3b510562 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -42,6 +42,16 @@ ], "console": "integratedTerminal" }, + { + "name": "Safety Scan 2", + "type": "debugpy", + "request": "launch", + "module": "safety", + "args": [ + "scan", "--use-server-matching" + ], + "console": "integratedTerminal" + }, { "name": "Safety Scan API Key", "type": "debugpy", diff --git a/CHANGELOG.md b/CHANGELOG.md index b1139ddd..17c8a76d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,14 @@ All notable changes to this project will be documented in this file. The format is partly based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html) and [PEP 440](https://peps.python.org/pep-0440/) +## [3.2.13] - 2024-12-10 +- Remove email verification for running scans (#645) + +## [3.2.12] - 2024-12-10 +- Add CVE Details and Single-Key Filtering for JSON Output in safety scan (#643) +- feature/add-branch-name (#641) +- feat/add --headless to --help (#636) + ## [3.2.11] - 2024-11-12 - chore/upgrade-dparse (#633) - Migrate to PyPI Trusted Publisher for Automated Package Deployment (#632) diff --git a/safety/VERSION b/safety/VERSION index 17ce9180..d883a100 100644 --- a/safety/VERSION +++ b/safety/VERSION @@ -1 +1 @@ -3.2.11 +3.2.13 diff --git a/safety/auth/utils.py b/safety/auth/utils.py index 4d42ddba..b790c885 100644 --- a/safety/auth/utils.py +++ b/safety/auth/utils.py @@ -3,6 +3,7 @@ import logging import sys from typing import Any, Optional, Dict, Callable, Tuple +from urllib.parse import urljoin from authlib.integrations.requests_client import OAuth2Session from authlib.integrations.base_client.errors import OAuthError import requests @@ -11,7 +12,7 @@ from safety.auth.constants import AUTH_SERVER_URL, CLAIM_EMAIL_VERIFIED_API, \ CLAIM_EMAIL_VERIFIED_AUTH_SERVER from safety.auth.main import get_auth_info, get_token_data -from safety.constants import PLATFORM_API_CHECK_UPDATES_ENDPOINT, PLATFORM_API_INITIALIZE_SCAN_ENDPOINT, PLATFORM_API_POLICY_ENDPOINT, \ +from safety.constants import PLATFORM_API_BASE_URL, PLATFORM_API_CHECK_UPDATES_ENDPOINT, PLATFORM_API_INITIALIZE_SCAN_ENDPOINT, PLATFORM_API_POLICY_ENDPOINT, \ PLATFORM_API_PROJECT_CHECK_ENDPOINT, PLATFORM_API_PROJECT_ENDPOINT, PLATFORM_API_PROJECT_SCAN_REQUEST_ENDPOINT, \ PLATFORM_API_PROJECT_UPLOAD_SCAN_ENDPOINT, REQUEST_TIMEOUT from safety.scan.util import AuthenticationType @@ -48,9 +49,15 @@ def is_email_verified(info: Dict[str, Any]) -> Optional[bool]: info (Dict[str, Any]): The user information. Returns: - bool: True if the email is verified, False otherwise. + bool: True """ - return info.get(CLAIM_EMAIL_VERIFIED_API) or info.get(CLAIM_EMAIL_VERIFIED_AUTH_SERVER) + # return info.get(CLAIM_EMAIL_VERIFIED_API) or info.get( + # CLAIM_EMAIL_VERIFIED_AUTH_SERVER + # ) + + # Always return True to avoid email verification + return True + def parse_response(func: Callable) -> Callable: @@ -366,6 +373,30 @@ def upload_report(self, json_report: str) -> Any: headers=headers ) + def upload_requirments(self, json_payload: str) -> Any: + """ + Upload a scan report. + + Args: + json_report (str): The JSON report. + + Returns: + Any: The upload result. + """ + + headers = { + "Content-Type": "application/json" + } + from safety.constants import PLATFORM_API_BASE_URL + if not PLATFORM_API_BASE_URL.endswith("/"): + PLATFORM_API_BASE_URL += "/" + SCAN_API_ENDPOINT = urljoin(PLATFORM_API_BASE_URL, "process_files/") + + return self.post( + url=SCAN_API_ENDPOINT, + data=json.dumps(json_payload), + headers=headers + ) @parse_response def check_updates(self, version: int, safety_version: Optional[str] = None, python_version: Optional[str] = None, os_type: Optional[str] = None, os_release: Optional[str] = None, os_description: Optional[str] = None) -> Any: @@ -427,8 +458,8 @@ def send(self, request: requests.PreparedRequest, **kwargs: Any) -> requests.Res """ request.headers.pop("Authorization", None) return super().send(request, **kwargs) - - + + from functools import lru_cache @lru_cache(maxsize=1) @@ -438,7 +469,7 @@ def is_jupyter_notebook() -> bool: various cloud-hosted Jupyter notebooks. Returns: - bool: True if the environment is identified as a Jupyter notebook (or + bool: True if the environment is identified as a Jupyter notebook (or equivalent cloud-based environment), False otherwise. Supported environments: diff --git a/safety/cli_util.py b/safety/cli_util.py index 50fa1a5e..3579dbc0 100644 --- a/safety/cli_util.py +++ b/safety/cli_util.py @@ -1,5 +1,6 @@ from collections import defaultdict import logging +import subprocess import sys from typing import Any, DefaultDict, Dict, List, Optional, Tuple, Union import click @@ -373,7 +374,7 @@ def format_main_help(obj: Union[click.Command, click.Group], from typer.rich_utils import highlighter, STYLE_USAGE_COMMAND, \ ARGUMENTS_PANEL_TITLE, OPTIONS_PANEL_TITLE, \ COMMANDS_PANEL_TITLE - + from rich.align import Align from rich.padding import Padding from rich.console import Console @@ -793,4 +794,21 @@ def inner(ctx, output: Optional[ScanOutput], *args, **kwargs): exception = e if isinstance(e, SafetyException) else SafetyException(info=e) output_exception(exception, exit_code_output=True) - return inner \ No newline at end of file + return inner + +def get_git_branch_name() -> Optional[str]: + """ + Retrieves the current Git branch name. + + Returns: + str: The current Git branch name, or None if it cannot be determined. + """ + try: + branch_name = subprocess.check_output( + ["git", "rev-parse", "--abbrev-ref", "HEAD"], + stderr=subprocess.DEVNULL, + text=True + ).strip() + return branch_name if branch_name else None + except Exception: + return None diff --git a/safety/scan/command.py b/safety/scan/command.py index c5543de6..0ddfc3d8 100644 --- a/safety/scan/command.py +++ b/safety/scan/command.py @@ -1,6 +1,8 @@ from enum import Enum import logging from pathlib import Path + +import json import sys from typing import Any, Dict, List, Optional, Set, Tuple from typing_extensions import Annotated @@ -14,7 +16,7 @@ from rich.padding import Padding import typer from safety.auth.constants import SAFETY_PLATFORM_URL -from safety.cli_util import get_command_for +from safety.cli_util import get_command_for, get_git_branch_name from rich.console import Console from safety.errors import SafetyError @@ -48,7 +50,9 @@ class ScannableEcosystems(Enum): def process_report( obj: Any, console: Console, report: ReportModel, output: str, - save_as: Optional[Tuple[str, Path]], **kwargs + save_as: Optional[Tuple[str, Path]], detailed_output: bool = False, + filter_keys: Optional[List[str]] = None, + **kwargs ) -> Optional[str]: """ Processes and outputs the report based on the given parameters. @@ -59,6 +63,8 @@ def process_report( report (ReportModel): The report model. output (str): The output format. save_as (Optional[Tuple[str, Path]]): The save-as format and path. + detailed_output (bool): Whether detailed output is enabled. + filter_keys (Optional[List[str]]): Keys to filter from the JSON output. kwargs: Additional keyword arguments. Returns: @@ -139,9 +145,17 @@ def process_report( if obj.platform_enabled and report_url: if report.metadata.scan_type is ScanType.scan: project_url = f"{SAFETY_PLATFORM_URL}{obj.project.url_path}" - lines.append(f"Scan report: [link]{report_url}[/link]") - lines.append("Project dashboard: " \ - f"[link]{project_url}[/link]") + # Get the current branch name + branch_name = get_git_branch_name() + + # Append the branch name if available + if branch_name: + project_url_with_branch = f"{project_url}?branch={branch_name}" + else: + project_url_with_branch = project_url + + lines.append(f"Project dashboard: [link]{project_url_with_branch}[/link]") + elif report.metadata.scan_type is ScanType.system_scan: lines.append(f"System scan report: [link]{report_url}[/link]") @@ -153,6 +167,12 @@ def process_report( if output is ScanOutput.JSON or ScanOutput.is_format(output, ScanOutput.SPDX): if output is ScanOutput.JSON: + if detailed_output: + report_to_output = add_cve_details_to_report(report_to_output, obj.project.files) + + if filter_keys: + report_to_output = filter_json_keys(report_to_output, filter_keys) + kwargs = {"json": report_to_output} else: kwargs = {"data": report_to_output} @@ -166,6 +186,95 @@ def process_report( return report_url +def filter_json_keys(json_string: str, keys: List[str]) -> str: + """ + Filters the given JSON string by the specified top-level keys. + + Args: + json_string (str): The JSON string to filter. + keys (List[str]): List of top-level keys to include in the output. + + Returns: + str: A JSON string containing only the specified keys. + """ + report_dict = json.loads(json_string) + filtered_data = {key: report_dict[key] for key in keys if key in report_dict} + return json.dumps(filtered_data, indent=4) + + +def filter_valid_cves(vulnerabilities: List[Any]) -> List[Dict[str, Any]]: + """ + Filters and returns valid CVE details from a list of vulnerabilities. + + Args: + vulnerabilities (List[Any]): A list of vulnerabilities, which may include invalid data types. + + Returns: + List[Dict[str, Any]]: A list of filtered CVE details that are either strings or dictionaries. + """ + return [ + cve for cve in vulnerabilities if isinstance(cve, str) or isinstance(cve, dict) + ] #type:ignore + + +def sort_cve_data(cve_data: List[Dict[str, Any]]) -> List[Dict[str, Any]]: + """ + Sorts CVE details by severity in descending order. + + Args: + cve_data (List[Dict[str, Any]]): A list of CVE details dictionaries, each containing a 'severity' key. + + Returns: + List[Dict[str, Any]]: The sorted list of CVE details, prioritized by severity (e.g., CRITICAL > HIGH > MEDIUM). + """ + severity_order = {key.name: id for (id, key) in enumerate(VulnerabilitySeverityLabels)} + return sorted(cve_data, key=lambda x: severity_order.get(x["severity"].upper(), 0), reverse=True) + + +def generate_cve_details(files: List[FileModel]) -> List[Dict[str, Any]]: + """ + Generate CVE details from the scanned files. + + Args: + files (List[FileModel]): List of scanned file models. + + Returns: + List[Dict[str, Any]]: List of CVE details sorted by severity. + """ + cve_data = [] + for file in files: + for spec in file.results.get_affected_specifications(): + for vuln in spec.vulnerabilities: + if vuln.CVE: + cve_data.append({ + "package": spec.name, + "affected_version": str(spec.specifier), + "safety_vulnerability_id": vuln.vulnerability_id, + "CVE": filter_valid_cves(vuln.CVE), + "more_info": vuln.more_info_url, + "advisory": vuln.advisory, + "severity": vuln.severity.cvssv3.get("base_severity", "Unknown") if vuln.severity and vuln.severity.cvssv3 else "Unknown", + }) + return sort_cve_data(cve_data) + + +def add_cve_details_to_report(report_to_output: str, files: List[FileModel]) -> str: + """ + Add CVE details to the JSON report output. + + Args: + report_to_output (str): The current JSON string of the report. + files (List[FileModel]): List of scanned files containing vulnerability data. + + Returns: + str: The updated JSON string with CVE details added. + """ + cve_details = generate_cve_details(files) + report_dict = json.loads(report_to_output) + report_dict["cve_details"] = cve_details + return json.dumps(report_dict) + + def generate_updates_arguments() -> List: """ Generates a list of file types and update limits for apply fixes. @@ -241,12 +350,24 @@ def scan(ctx: typer.Context, typer.Option("--apply-fixes", help=SCAN_APPLY_FIXES, show_default=False) - ] = False + ] = False, + use_server_matching: Annotated[ + bool, + typer.Option( + "--use-server-matching", + help="Flag to enable using server side vulnerability matching. This just sends data to server for now.", + show_default=False, + ), + ] = False, + filter_keys: Annotated[ + Optional[List[str]], + typer.Option("--filter", help="Filter output by specific top-level JSON keys.") + ] = None, ): """ Scans a project (defaulted to the current directory) for supply-chain security and configuration issues """ - + if not ctx.obj.metadata.authenticated: raise SafetyError("Authentication required. Please run 'safety auth login' to authenticate before using this command.") @@ -313,7 +434,7 @@ def scan(ctx: typer.Context, # Process each file for dependencies and vulnerabilities with console.status(wait_msg, spinner=DEFAULT_SPINNER) as status: for path, analyzed_file in process_files(paths=file_paths, - config=config): + config=config, use_server_matching=use_server_matching, obj=ctx.obj, target=target): count += len(analyzed_file.dependency_results.dependencies) # Update exit code if vulnerabilities are found @@ -371,7 +492,7 @@ def sort_vulns_by_score(vuln: Vulnerability) -> int: detailed_output=detailed_output) lines = [] - + if spec.remediation.recommended: total_resolved_vulns += spec.remediation.vulnerabilities_found @@ -441,22 +562,33 @@ def sort_vulns_by_score(vuln: Vulnerability) -> int: telemetry=telemetry, files=[], projects=[ctx.obj.project]) - + total_issues_with_duplicates, total_ignored_issues = get_vulnerability_summary(report.as_v30()) - + print_summary( - console=console, - total_issues_with_duplicates=total_issues_with_duplicates, + console=console, + total_issues_with_duplicates=total_issues_with_duplicates, total_ignored_issues=total_ignored_issues, - project=ctx.obj.project, - dependencies_count=count, - fixes_count=fixes_count, - resolved_vulns_per_fix=total_resolved_vulns, - is_detailed_output=detailed_output, + project=ctx.obj.project, + dependencies_count=count, + fixes_count=fixes_count, + resolved_vulns_per_fix=total_resolved_vulns, + is_detailed_output=detailed_output, ignored_vulns_data=ignored_vulns_data ) - report_url = process_report(ctx.obj, console, report, **{**ctx.params}) + report_url = process_report( + obj=ctx.obj, + console=console, + report=report, + output=output, + save_as=save_as if save_as and all(save_as) else None, + detailed_output=detailed_output, + filter_keys=filter_keys, + **{k: v for k, v in ctx.params.items() if k not in {"detailed_output", "output", "save_as", "filter_keys"}} +) + + project_url = f"{SAFETY_PLATFORM_URL}{ctx.obj.project.url_path}" if apply_updates: @@ -796,7 +928,7 @@ def get_vulnerability_summary(report: Dict[str, Any]) -> Tuple[int, int]: Args: report (ReportModel): The report containing vulnerability data. - + Returns: Tuple[int, int]: A tuple containing: - Total number of issues (including duplicates) diff --git a/safety/scan/main.py b/safety/scan/main.py index 3b6d2e71..e1800cc8 100644 --- a/safety/scan/main.py +++ b/safety/scan/main.py @@ -2,6 +2,10 @@ import logging from pathlib import Path import re +import requests +import os +from urllib.parse import urljoin +import platform import time from typing import Any, Dict, Generator, Optional, Set, Tuple, Union from pydantic import ValidationError @@ -11,10 +15,13 @@ from .ecosystems.base import InspectableFile from .ecosystems.target import InspectableFileContext from .models import ScanExport, UnverifiedProjectModel +from safety.scan.util import GIT from safety_schemas.models import FileType, PolicyFileModel, PolicySource, \ ConfigModel, Stage, ProjectModel, ScanType +from safety.util import get_safety_version +from safety.constants import PLATFORM_API_BASE_URL LOG = logging.getLogger(__name__) @@ -196,8 +203,51 @@ def save_report_as(scan_type: ScanType, export_type: ScanExport, at: Path, repor with open(at, 'w+') as report_file: report_file.write(report) +def build_meta(target: Path) -> Dict[str, Any]: + """ + Build the meta JSON object for a file. + + Args: + target (Path): The path of the repository. -def process_files(paths: Dict[str, Set[Path]], config: Optional[ConfigModel] = None) -> Generator[Tuple[Path, InspectableFile], None, None]: + Returns: + Dict[str, Any]: The metadata dictionary. + """ + target_obj = target.resolve() + git_utils = GIT(target_obj) + + git_data = git_utils.build_git_data() + git_metadata = { + "branch": git_data.branch if git_data else None, + "commit": git_data.commit if git_data else None, + "dirty": git_data.dirty if git_data else None, + "tag": git_data.tag if git_data else None, + "origin": git_data.origin if git_data else None, + } + + os_metadata = { + "type": os.environ.get("SAFETY_OS_TYPE", None) or platform.system(), + "release": os.environ.get("SAFETY_OS_RELEASE", None) or platform.release(), + "description": os.environ.get("SAFETY_OS_DESCRIPTION", None) or platform.platform(), + } + + python_metadata= { + "version": platform.python_version(), + } + + client_metadata = { + "version": get_safety_version(), + } + + return { + "target": str(target), + "os": os_metadata, + "git": git_metadata, + "python": python_metadata, + "client": client_metadata, + } + +def process_files(paths: Dict[str, Set[Path]], config: Optional[ConfigModel] = None, use_server_matching: bool = False, obj=None, target=Path(".")) -> Generator[Tuple[Path, InspectableFile], None, None]: """ Processes the files and yields each file path along with its inspectable file. @@ -211,13 +261,52 @@ def process_files(paths: Dict[str, Set[Path]], config: Optional[ConfigModel] = N if not config: config = ConfigModel() - for file_type_key, f_paths in paths.items(): - file_type = FileType(file_type_key) - if not file_type or not file_type.ecosystem: - continue - for f_path in f_paths: - with InspectableFileContext(f_path, file_type=file_type) as inspectable_file: - if inspectable_file and inspectable_file.file_type: - inspectable_file.inspect(config=config) - inspectable_file.remediate() - yield f_path, inspectable_file + # old GET implementation + if not use_server_matching: + for file_type_key, f_paths in paths.items(): + file_type = FileType(file_type_key) + if not file_type or not file_type.ecosystem: + continue + for f_path in f_paths: + with InspectableFileContext(f_path, file_type=file_type) as inspectable_file: + if inspectable_file and inspectable_file.file_type: + inspectable_file.inspect(config=config) + inspectable_file.remediate() + yield f_path, inspectable_file + + # new POST implementation + else: + files = [] + meta = build_meta(target) + for file_type_key, f_paths in paths.items(): + file_type = FileType(file_type_key) + if not file_type or not file_type.ecosystem: + continue + for f_path in f_paths: + relative_path = os.path.relpath(f_path, start=os.getcwd()) + # Read the file content + try: + with open(f_path, "r") as file: + content = file.read() + except Exception as e: + LOG.error(f"Error reading file {f_path}: {e}") + continue + # Append metadata to the payload + files.append({ + "name": relative_path, + "content": content, + }) + + # Prepare the payload with metadata at the top level + payload = { + "meta": meta, + "files": files, + } + + response = obj.auth.client.upload_requirments(payload) + + if response.status_code == 200: + LOG.info("Scan Payload successfully sent to the API.") + else: + LOG.error(f"Failed to send scan payload to the API. Status code: {response.status_code}") + LOG.error(f"Response: {response.text}") diff --git a/setup.cfg b/setup.cfg index 2742c3ea..bd44b63d 100644 --- a/setup.cfg +++ b/setup.cfg @@ -50,7 +50,7 @@ install_requires = rich typer pydantic>=1.10.12 - safety_schemas>=0.0.8 + safety_schemas==0.0.10 typing-extensions>=4.7.1 filelock~=3.12.2 psutil~=6.0.0 diff --git a/test_requirements.txt b/test_requirements.txt index fb64b08c..13ba9532 100644 --- a/test_requirements.txt +++ b/test_requirements.txt @@ -17,7 +17,7 @@ Authlib>=1.2.0 rich typer pydantic>=1.10.12 -safety_schemas>=0.0.8 +safety_schemas==0.0.10 typing-extensions>=4.7.1 filelock~=3.12.2 psutil~=6.0.0 diff --git a/tests/auth/test_cli.py b/tests/auth/test_cli.py index 9b035b3d..f77aeae6 100644 --- a/tests/auth/test_cli.py +++ b/tests/auth/test_cli.py @@ -1,4 +1,3 @@ - from unittest.mock import Mock, PropertyMock, patch, ANY import click from click.testing import CliRunner @@ -14,22 +13,27 @@ def setUp(self): self.maxDiff = None self.runner = CliRunner(mix_stderr=False) + @unittest.skip("We are bypassing email verification for now") @patch("safety.auth.cli.fail_if_authenticated") @patch("safety.auth.cli.get_authorization_data") @patch("safety.auth.cli.process_browser_callback") - def test_auth_calls_login(self, process_browser_callback, - get_authorization_data, fail_if_authenticated): + def test_auth_calls_login( + self, process_browser_callback, get_authorization_data, fail_if_authenticated + ): auth_data = "https://safetycli.com", "initialState" get_authorization_data.return_value = auth_data - process_browser_callback.return_value = {"email": "user@safetycli.com", "name": "Safety User"} - result = self.runner.invoke(cli, ['auth']) + process_browser_callback.return_value = { + "email": "user@safetycli.com", + "name": "Safety User", + } + result = self.runner.invoke(cli, ["auth"]) fail_if_authenticated.assert_called_once() get_authorization_data.assert_called_once() - process_browser_callback.assert_called_once_with(auth_data[0], - initial_state=auth_data[1], - ctx=ANY, headless=False) - + process_browser_callback.assert_called_once_with( + auth_data[0], initial_state=auth_data[1], ctx=ANY, headless=False + ) + expected = [ "", "Redirecting your browser to log in; once authenticated, return here to start using Safety", @@ -42,8 +46,8 @@ def test_auth_calls_login(self, process_browser_callback, "", "Can’t find the verification email? Login at", "`https://platform.safetycli.com/login/` to resend the verification email", - "" + "", ] for res_line, exp_line in zip(result.stdout.splitlines(), expected): - self.assertIn(exp_line, res_line) + self.assertIn(exp_line, res_line) diff --git a/tests/scan/test_command.py b/tests/scan/test_command.py index 48df61c8..e8d9a9ef 100644 --- a/tests/scan/test_command.py +++ b/tests/scan/test_command.py @@ -1,5 +1,6 @@ import os import unittest + from unittest.mock import patch, Mock from click.testing import CliRunner from safety.cli import cli @@ -13,7 +14,7 @@ def setUp(self): self.runner = CliRunner(mix_stderr=False) self.dirname = os.path.dirname(__file__) - def test_scan(self): + def test_scan(self): result = self.runner.invoke(cli, ["--stage", "cicd", "scan", "--target", self.dirname, "--output", "json"]) self.assertEqual(result.exit_code, 1) @@ -22,4 +23,3 @@ def test_scan(self): result = self.runner.invoke(cli, ["--stage", "cicd", "scan", "--target", self.dirname, "--output", "screen"]) self.assertEqual(result.exit_code, 1) -