Skip to content

Commit

Permalink
kubectl command wrapper (#172)
Browse files Browse the repository at this point in the history
* kubectl command wrapper

* EOL

* get fresh credentials when spawning the shell too

* no need to override these methods

* development environment explanation

* kc alias

* configure command

* refresh credentials on each run

* don't assume the position of the aws eks command

* refresh_aws_credentials to support both SSO and MFA

* tests!

* doc for the command
  • Loading branch information
Franr authored Mar 20, 2023
1 parent e28f22a commit 1d96739
Show file tree
Hide file tree
Showing 12 changed files with 377 additions and 39 deletions.
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,6 @@ leverage.egg-info
**/__pycache__*
.pytest_cache
coverage
.coverage
.coverage

.idea
12 changes: 12 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,18 @@ So, if you have created a project with version <1.8.0 and want to use it with ve

For the second item you can check the version [here](https://hub.docker.com/r/binbash/leverage-toolbox/tags).

## Setting up development environment

First, you should create a virtual environment and install all the required dependencies by running: `pipenv install --dev`.

If you don't have `pipenv` in your system, you can check the following documentation: https://pipenv.pypa.io/en/latest/install/#installing-pipenv

Once you have everything in place, install the CLI as an editable package inside the virtual environment: `pipenv install -e .`

This way, the `leverage` command on your venv will be executed from the project folder, using it as the source.

Now all the changes to the project will be immediately reflected on the command.

## Running Tests
To run unit tests, pytest is the tool of choice, and the required dependencies are available in the corresponding `dev-requirements.txt`.

Expand Down
81 changes: 81 additions & 0 deletions leverage/_utils.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
"""
General use utilities.
"""
import functools
from subprocess import run
from subprocess import PIPE

from click.exceptions import Exit

from leverage import logger


def clean_exception_traceback(exception):
""" Delete special local variables from all frames of an exception's traceback
as to avoid polluting the output when displaying it.
Expand Down Expand Up @@ -50,3 +56,78 @@ def git(command):
command = ["git"] + command if command[0] != "git" else command

run(command, stdout=PIPE, stderr=PIPE, check=True)


def chain_commands(commands: list, chain: str = " && ") -> str:
return f"bash -c \"{chain.join(commands)}\""


class CustomEntryPoint:
"""
Set a custom entrypoint on the container while entering the context.
Once outside, return it to its original value.
"""

def __init__(self, container, entrypoint):
self.container = container
self.old_entrypoint = container.entrypoint
self.new_entrypoint = entrypoint

def __enter__(self):
self.container.entrypoint = self.new_entrypoint

def __exit__(self, *args, **kwargs):
self.container.entrypoint = self.old_entrypoint


class EmptyEntryPoint(CustomEntryPoint):
"""
Force an empty entrypoint. This will let you execute any commands freely.
"""

def __init__(self, container):
super(EmptyEntryPoint, self).__init__(container, entrypoint="")


def refresh_aws_credentials(func):
"""
Use this decorator in the case you want to make sure you will have fresh tokens to interact with AWS
during the execution of your wrapped method.
"""
@functools.wraps(func)
def wrapper(*args, **kwargs):
container = args[0] # this is the "self" of the method you are decorating; a LeverageContainer instance

if container.sso_enabled:
container._check_sso_token()
auth_method = container.TF_SSO_ENTRYPOINT
elif container.mfa_enabled:
auth_method = container.TF_MFA_ENTRYPOINT
# TODO: ask why this was necessary
container.environment.update({
"AWS_SHARED_CREDENTIALS_FILE": container.environment["AWS_SHARED_CREDENTIALS_FILE"].replace("tmp", ".aws"),
"AWS_CONFIG_FILE": container.environment["AWS_CONFIG_FILE"].replace("tmp", ".aws"),
})
else:
# no auth method found: skip the refresh
return func(*args, **kwargs)

logger.info("Fetching AWS credentials...")
with CustomEntryPoint(container, f"{auth_method} -- echo"):
# this simple echo "Fetching..." will run the SSO/MFA entrypoints underneath
# that takes care of the token refresh
exit_code = container._start("Fetching done.")
if exit_code:
raise Exit(exit_code)
if container.mfa_enabled:
# we need to revert to the original values, otherwise other tools that rely on awscli, like kubectl
# won't find the credentials
container.environment.update({
"AWS_SHARED_CREDENTIALS_FILE": container.environment["AWS_SHARED_CREDENTIALS_FILE"].replace(".aws", "tmp"),
"AWS_CONFIG_FILE": container.environment["AWS_CONFIG_FILE"].replace(".aws", "tmp"),
})

# we should have a valid token at this point, now execute the original method
return func(*args, **kwargs)

return wrapper
2 changes: 0 additions & 2 deletions leverage/container.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import json
from pathlib import Path
from datetime import datetime
from datetime import timedelta

import hcl2
from click.exceptions import Exit
Expand Down Expand Up @@ -699,4 +698,3 @@ def exec(self, command, *arguments):
self._prepare_container()

return self._exec(command, *arguments)

87 changes: 87 additions & 0 deletions leverage/containers/kubectl.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
import os
import pwd
from pathlib import Path

from click.exceptions import Exit
from docker.types import Mount

from leverage import logger
from leverage._utils import chain_commands, EmptyEntryPoint, refresh_aws_credentials
from leverage.container import TerraformContainer


class KubeCtlContainer(TerraformContainer):
"""Container specifically tailored to run kubectl commands."""

KUBECTL_CLI_BINARY = "/usr/local/bin/kubectl"
KUBECTL_CONFIG_PATH = Path("/root/.kube")
KUBECTL_CONFIG_FILE = KUBECTL_CONFIG_PATH / Path("config")

def __init__(self, client):
super().__init__(client)

self.entrypoint = self.KUBECTL_CLI_BINARY

host_config_path = str(Path.home() / Path(f".kube/{self.project}"))
self.container_config["host_config"]["Mounts"].append(
# the container is expecting a file named "config" here
Mount(
source=host_config_path,
target=str(self.KUBECTL_CONFIG_PATH),
type="bind",
)
)

@refresh_aws_credentials
def start_shell(self):
with EmptyEntryPoint(self):
self._start()

@refresh_aws_credentials
def configure(self):
# make sure we are on the cluster layer
self.check_for_layer_location()

logger.info("Retrieving k8s cluster information...")
with EmptyEntryPoint(self):
# generate the command that will configure the new cluster
add_eks_cluster_cmd = self._get_eks_kube_config()
# and the command that will set the proper ownership on the config file (otherwise the owner will be "root")
change_owner_cmd = self._change_kube_file_owner_cmd()
full_cmd = chain_commands([add_eks_cluster_cmd, change_owner_cmd])

logger.info("Configuring context...")
with EmptyEntryPoint(self):
# we use _start here because in the case of MFA it will ask for the token
exit_code = self._start(full_cmd)
if exit_code:
raise Exit(exit_code)

logger.info("Done.")

def _get_eks_kube_config(self) -> str:
exit_code, output = self._exec(f"{self.TF_BINARY} output")
if exit_code:
logger.error(output)
raise Exit(exit_code)

aws_eks_cmd = next(op for op in output.split("\n") if op.startswith("aws eks update-kubeconfig"))
# assuming the cluster container is on the primary region
return aws_eks_cmd + f" --region {self.common_conf['region_primary']}"

def _get_user_group_id(self, user_id) -> int:
user = pwd.getpwuid(user_id)
return user.pw_gid

def _change_kube_file_owner_cmd(self) -> str:
user_id = os.getuid()
group_id = self._get_user_group_id(user_id)

return f"chown {user_id}:{group_id} {self.KUBECTL_CONFIG_FILE}"

def check_for_layer_location(self):
super(KubeCtlContainer, self).check_for_layer_location()
# assuming the "cluster" layer will contain the expected EKS outputs
if self.cwd.parts[-1] != "cluster":
logger.error("This command can only run at the [bold]cluster layer[/bold].")
raise Exit(1)
4 changes: 4 additions & 0 deletions leverage/leverage.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
from leverage.modules import terraform
from leverage.modules import credentials
from leverage.modules import tfautomv
from leverage.modules import kubectl


@click.group(invoke_without_command=True)
@click.option("--filename", "-f",
Expand Down Expand Up @@ -55,3 +57,5 @@ def leverage(context, state, filename, list_tasks, verbose):
leverage.add_command(credentials)
leverage.add_command(aws)
leverage.add_command(tfautomv)
leverage.add_command(kubectl)
leverage.add_command(kubectl, name="kc")
1 change: 1 addition & 0 deletions leverage/modules/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@
from .terraform import terraform
from .credentials import credentials
from .tfautomv import tfautomv
from .kubectl import kubectl
37 changes: 1 addition & 36 deletions leverage/modules/aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,42 +8,7 @@
from leverage._internals import pass_container
from leverage.container import get_docker_client
from leverage.container import AWSCLIContainer


def _handle_subcommand(context, cli_container, args, caller_name=None):
""" Decide if command corresponds to a wrapped one or not and run accordingly.
Args:
context (click.context): Current context
cli_container (AWSCLIContainer): Container where commands will be executed
args (tuple(str)): Arguments received by Leverage
caller_name (str, optional): Calling command. Defaults to None.
Raises:
Exit: Whenever container execution returns a non zero exit code
"""
caller_pos = args.index(caller_name) if caller_name is not None else 0

# Find if one of the wrapped subcommand was invoked
wrapped_subcommands = context.command.commands.keys()
subcommand = next((arg
for arg in args[caller_pos:]
if arg in wrapped_subcommands), None)

if subcommand is None:
# Pass command to aws cli directly
exit_code = cli_container.start(" ".join(args))
if not exit_code:
raise Exit(exit_code)

else:
# Invoke wrapped command
subcommand = context.command.commands.get(subcommand)
if not subcommand.params:
context.invoke(subcommand)
else:
context.forward(subcommand)

from leverage.modules.utils import _handle_subcommand

CONTEXT_SETTINGS={
"ignore_unknown_options": True
Expand Down
35 changes: 35 additions & 0 deletions leverage/modules/kubectl.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
from leverage._internals import pass_state
from leverage._internals import pass_container
from leverage.container import get_docker_client
from leverage.containers.kubectl import KubeCtlContainer

import click

from leverage.modules.utils import _handle_subcommand

CONTEXT_SETTINGS = {"ignore_unknown_options": True}


@click.group(invoke_without_command=True, context_settings={"ignore_unknown_options": True})
@click.argument("args", nargs=-1, type=click.UNPROCESSED)
@pass_state
@click.pass_context
def kubectl(context, state, args):
"""Run Kubectl commands in a custom containerized environment."""
state.container = KubeCtlContainer(get_docker_client())
state.container.ensure_image()
_handle_subcommand(context=context, cli_container=state.container, args=args)


@kubectl.command(context_settings=CONTEXT_SETTINGS)
@pass_container
def shell(kctl: KubeCtlContainer):
"""Spawn a shell with the kubectl credentials pre-configured."""
kctl.start_shell()


@kubectl.command(context_settings=CONTEXT_SETTINGS)
@pass_container
def configure(kctl: KubeCtlContainer):
"""Automatically add the EKS cluster from the layer into your kubectl config file."""
kctl.configure()
34 changes: 34 additions & 0 deletions leverage/modules/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from click.exceptions import Exit


def _handle_subcommand(context, cli_container, args, caller_name=None):
"""Decide if command corresponds to a wrapped one or not and run accordingly.
Args:
context (click.context): Current context
cli_container (LeverageContainer): Container where commands will be executed
args (tuple(str)): Arguments received by Leverage
caller_name (str, optional): Calling command. Defaults to None.
Raises:
Exit: Whenever container execution returns a non-zero exit code
"""
caller_pos = args.index(caller_name) if caller_name is not None else 0

# Find if one of the wrapped subcommand was invoked
wrapped_subcommands = context.command.commands.keys()
subcommand = next((arg for arg in args[caller_pos:] if arg in wrapped_subcommands), None)

if subcommand is None:
# Pass command to the container directly
exit_code = cli_container.start(" ".join(args))
if not exit_code:
raise Exit(exit_code)

else:
# Invoke wrapped command
subcommand = context.command.commands.get(subcommand)
if not subcommand.params:
context.invoke(subcommand)
else:
context.forward(subcommand)
6 changes: 6 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,9 @@ def with_click_context(click_context):
the need of a `with` statement. """
with click_context():
yield


@pytest.fixture
def muted_click_context(click_context):
with click_context(verbose=False):
yield
Loading

0 comments on commit 1d96739

Please sign in to comment.