Skip to content

Commit

Permalink
feat: added safety firewall
Browse files Browse the repository at this point in the history
  • Loading branch information
jakub-safetycli authored and yeisonvargasf committed Jan 22, 2025
1 parent f97fb15 commit df33f3c
Show file tree
Hide file tree
Showing 27 changed files with 2,197 additions and 266 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
{
"cells": [],
"cells": [
{
"metadata": {},
"cell_type": "raw",
"source": "",
"id": "e4a30302820cf149"
}
],
"metadata": {},
"nbformat": 4,
"nbformat_minor": 5
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ dependencies = [
"setuptools>=65.5.1",
"typer>=0.12.1",
"typing-extensions>=4.7.1",
"python-levenshtein>=0.25.1",
]
license = "MIT"
license-files = ["LICENSES/*"]
Expand Down
238 changes: 177 additions & 61 deletions safety/cli.py

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions safety/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ def get_user_dir() -> Path:
CACHE_FILE_DIR = USER_CONFIG_DIR / f"{JSON_SCHEMA_VERSION.replace('.', '')}"
DB_CACHE_FILE = CACHE_FILE_DIR / "cache.json"

PIP_LOCK = USER_CONFIG_DIR / "pip.lock"

CONFIG_FILE_NAME = "config.ini"
CONFIG_FILE_SYSTEM = SYSTEM_CONFIG_DIR / CONFIG_FILE_NAME if SYSTEM_CONFIG_DIR else None
CONFIG_FILE_USER = USER_CONFIG_DIR / CONFIG_FILE_NAME
Expand Down
Empty file added safety/init/__init__.py
Empty file.
96 changes: 96 additions & 0 deletions safety/init/command.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
from pathlib import Path

from rich.prompt import Prompt
from ..cli_util import SafetyCLICommand, SafetyCLISubGroup
import typer
import os

from safety.scan.decorators import initialize_scan
from safety.init.constants import PROJECT_INIT_CMD_NAME, PROJECT_INIT_HELP, PROJECT_INIT_DIRECTORY_HELP
from safety.init.main import create_project
from safety.console import main_console as console
from ..scan.command import scan
from ..scan.models import ScanOutput
from ..tool.main import configure_system, configure_local_directory, has_local_tool_files, configure_alias

try:
from typing import Annotated
except ImportError:
from typing_extensions import Annotated

init_app = typer.Typer(rich_markup_mode= "rich", cls=SafetyCLISubGroup)

@init_app.command(
cls=SafetyCLICommand,
help=PROJECT_INIT_HELP,
name=PROJECT_INIT_CMD_NAME,
options_metavar="[OPTIONS]",
context_settings={
"allow_extra_args": True,
"ignore_unknown_options": True
},
)
def init(ctx: typer.Context,
directory: Annotated[
Path,
typer.Argument(
exists=True,
file_okay=False,
dir_okay=True,
writable=False,
readable=True,
resolve_path=True,
show_default=False,
help=PROJECT_INIT_DIRECTORY_HELP
),
] = Path(".")):

do_init(ctx, directory, False)


def do_init(ctx: typer.Context, directory: Path, prompt_user: bool = True):
project_dir = directory if os.path.isabs(directory) else os.path.join(os.getcwd(), directory)
initialize_scan(ctx, console)
create_project(ctx, console, Path(project_dir))

answer = 'y' if not prompt_user else None
if prompt_user:
console.print(
"Safety prevents vulnerable or malicious packages from being installed on your computer. We do this by wrapping your package manager.")
prompt = "Do you want to enable proactive malicious package prevention?"
answer = Prompt.ask(prompt=prompt, choices=["y", "n"],
default="y", show_default=True, console=console).lower()

if answer == 'y':
configure_system()

if prompt_user:
prompt = "Do you want to alias pip to Safety?"
answer = Prompt.ask(prompt=prompt, choices=["y", "n"],
default="y", show_default=True, console=console).lower()

if answer == 'y':
configure_alias()

if has_local_tool_files(project_dir):
if prompt_user:
prompt = "Do you want to enable proactive malicious package prevention for any project in working directory?"
answer = Prompt.ask(prompt=prompt, choices=["y", "n"],
default="y", show_default=True, console=console).lower()

if answer == 'y':
configure_local_directory(project_dir)

if prompt_user:
prompt = "It looks like your current directory contains a requirements.txt file. Would you like Safety to scan it?"
answer = Prompt.ask(prompt=prompt, choices=["y", "n"],
default="y", show_default=True, console=console).lower()

if answer == 'y':
ctx.command.name = "scan"
ctx.params = {
"target": directory,
"output": ScanOutput.SCREEN,
"policy_file_path": None
}
scan(ctx=ctx, target=directory, output=ScanOutput.SCREEN, policy_file_path=None)
6 changes: 6 additions & 0 deletions safety/init/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# Project options
PROJECT_INIT_CMD_NAME = "init"
PROJECT_INIT_HELP = "Creates new Safety CLI project in the current working directory."\
"\nExample: safety project init"
PROJECT_INIT_DIRECTORY_HELP = "Defines a directory for creating a new project. (default: current directory)\n\n" \
"[bold]Example: safety project init /path/to/project[/bold]"
258 changes: 258 additions & 0 deletions safety/init/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,258 @@
import typer
from rich.console import Console

from .models import UnverifiedProjectModel

import configparser
from pathlib import Path
from safety_schemas.models import ProjectModel, Stage
from safety.scan.util import GIT
from ..auth.utils import SafetyAuthSession

from typing import Optional
from safety.scan.render import (
print_wait_project_verification,
prompt_project_id,
prompt_link_project,
)

PROJECT_CONFIG = ".safety-project.ini"
PROJECT_CONFIG_SECTION = "project"
PROJECT_CONFIG_ID = "id"
PROJECT_CONFIG_URL = "url"
PROJECT_CONFIG_NAME = "name"


def check_project(
ctx: typer.Context,
session: SafetyAuthSession,
console: Console,
unverified_project: UnverifiedProjectModel,
stage: Stage,
git_origin: Optional[str],
ask_project_id: bool = False,
) -> dict:
"""
Check the project against the session and stage, verifying the project if necessary.
Args:
console: The console for output.
ctx (typer.Context): The context of the Typer command.
session (SafetyAuthSession): The authentication session.
unverified_project (UnverifiedProjectModel): The unverified project model.
stage (Stage): The current stage.
git_origin (Optional[str]): The Git origin URL.
ask_project_id (bool): Whether to prompt for the project ID.
Returns:
dict: The result of the project check.
"""
stage = ctx.obj.auth.stage
source = ctx.obj.telemetry.safety_source if ctx.obj.telemetry else None
data = {"scan_stage": stage, "safety_source": source}

PRJ_SLUG_KEY = "project_slug"
PRJ_SLUG_SOURCE_KEY = "project_slug_source"
PRJ_GIT_ORIGIN_KEY = "git_origin"

if git_origin:
data[PRJ_GIT_ORIGIN_KEY] = git_origin

if unverified_project.id:
data[PRJ_SLUG_KEY] = unverified_project.id
data[PRJ_SLUG_SOURCE_KEY] = ".safety-project.ini"
elif not git_origin or ask_project_id:
# Set a project id for this scan (no spaces). If empty Safety will use: pyupio:
parent_root_name = None
if unverified_project.project_path.parent.name:
parent_root_name = unverified_project.project_path.parent.name

unverified_project.id = prompt_project_id(console, stage, parent_root_name)
data[PRJ_SLUG_KEY] = unverified_project.id
data[PRJ_SLUG_SOURCE_KEY] = "user"

status = print_wait_project_verification(
console,
data[PRJ_SLUG_KEY] if data.get(PRJ_SLUG_KEY, None) else "-",
(session.check_project, data),
on_error_delay=1,
)

return status


def verify_project(
console: Console,
ctx: typer.Context,
session: SafetyAuthSession,
unverified_project: UnverifiedProjectModel,
stage: Stage,
git_origin: Optional[str],
):
"""
Verify the project, linking it if necessary and saving the verified project information.
Args:
console: The console for output.
ctx (typer.Context): The context of the Typer command.
session (SafetyAuthSession): The authentication session.
unverified_project (UnverifiedProjectModel): The unverified project model.
stage (Stage): The current stage.
git_origin (Optional[str]): The Git origin URL.
"""

verified_prj = False

link_prj = True

while not verified_prj:
result = check_project(
ctx,
session,
console,
unverified_project,
stage,
git_origin,
ask_project_id=not link_prj,
)

unverified_slug = result.get("slug")

project = result.get("project", None)
user_confirm = result.get("user_confirm", False)

if user_confirm:
if project and link_prj:
prj_name = project.get("name", None)
prj_admin_email = project.get("admin", None)

link_prj = prompt_link_project(
prj_name=prj_name, prj_admin_email=prj_admin_email, console=console
)

if not link_prj:
continue

verified_prj = print_wait_project_verification(
console,
unverified_slug,
(session.project, {"project_id": unverified_slug}),
on_error_delay=1,
)

if (
verified_prj
and isinstance(verified_prj, dict)
and verified_prj.get("slug", None)
):
save_verified_project(
ctx,
verified_prj["slug"],
verified_prj.get("name", None),
unverified_project.project_path,
verified_prj.get("url", None),
)
else:
verified_prj = False


def load_unverified_project_from_config(project_root: Path) -> UnverifiedProjectModel:
"""
Loads an unverified project from the configuration file located at the project root.
Args:
project_root (Path): The root directory of the project.
Returns:
UnverifiedProjectModel: An instance of UnverifiedProjectModel.
"""
config = configparser.ConfigParser()
project_path = project_root / PROJECT_CONFIG
config.read(project_path)
id = config.get(PROJECT_CONFIG_SECTION, PROJECT_CONFIG_ID, fallback=None)
url = config.get(PROJECT_CONFIG_SECTION, PROJECT_CONFIG_URL, fallback=None)
name = config.get(PROJECT_CONFIG_SECTION, PROJECT_CONFIG_NAME, fallback=None)
created = True
if id:
created = False

return UnverifiedProjectModel(
id=id, url_path=url, name=name, project_path=project_path, created=created
)


def save_verified_project(
ctx: typer.Context,
slug: str,
name: Optional[str],
project_path: Path,
url_path: Optional[str],
):
"""
Save the verified project information to the context and project info file.
Args:
ctx (typer.Context): The context of the Typer command.
slug (str): The project slug.
name (Optional[str]): The project name.
project_path (Path): The project path.
url_path (Optional[str]): The project URL path.
"""
ctx.obj.project = ProjectModel(
id=slug, name=name, project_path=project_path, url_path=url_path
)
if ctx.obj.auth.stage is Stage.development:
save_project_info(project=ctx.obj.project, project_path=project_path)


def save_project_info(project: ProjectModel, project_path: Path) -> None:
"""
Saves the project information to the configuration file.
Args:
project (ProjectModel): The ProjectModel object containing project information.
project_path (Path): The path to the configuration file.
"""
config = configparser.ConfigParser()
config.read(project_path)

if PROJECT_CONFIG_SECTION not in config.sections():
config[PROJECT_CONFIG_SECTION] = {}

config[PROJECT_CONFIG_SECTION][PROJECT_CONFIG_ID] = project.id
if project.url_path:
config[PROJECT_CONFIG_SECTION][PROJECT_CONFIG_URL] = project.url_path
if project.name:
config[PROJECT_CONFIG_SECTION][PROJECT_CONFIG_NAME] = project.name

with open(project_path, "w") as configfile:
config.write(configfile)


def create_project(
ctx: typer.Context, console: Console, target: Path
):
"""
Loads existing project from the specified target locations or creates a new project.
Args:
ctx: The CLI context
session: The authentication session
console: The console object
target (Path): The target location
"""
# Load .safety-project.ini
unverified_project = load_unverified_project_from_config(project_root=target)

stage = ctx.obj.auth.stage
session = ctx.obj.auth.client
git_data = GIT(root=target).build_git_data()
origin = None

if git_data:
origin = git_data.origin

if ctx.obj.platform_enabled:
verify_project(console, ctx, session, unverified_project, stage, origin)
else:
console.print("Project creation is not supported for your account.")
Loading

0 comments on commit df33f3c

Please sign in to comment.