Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kubectl command wrapper #172

Merged
merged 12 commits into from
Mar 20, 2023
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,6 @@ leverage.egg-info
**/__pycache__*
.pytest_cache
coverage
.coverage
.coverage

.idea
12 changes: 12 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,18 @@ So, if you have created a project with version <1.8.0 and want to use it with ve

For the second item you can check the version [here](https://hub.docker.com/r/binbash/leverage-toolbox/tags).

## Setting up development environment

First, you should create a virtual environment and install all the required dependencies by running: `pipenv install --dev`.

If you don't have `pipenv` in your system, you can check the following documentation: https://pipenv.pypa.io/en/latest/install/#installing-pipenv

Once you have everything in place, install the CLI as an editable package inside the virtual environment: `pipenv install -e .`

This way, the `leverage` command on your venv will be executed from the project folder, using it as the source.

Now all the changes to the project will be immediately reflected on the command.

## Running Tests
To run unit tests, pytest is the tool of choice, and the required dependencies are available in the corresponding `dev-requirements.txt`.

Expand Down
31 changes: 31 additions & 0 deletions leverage/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,34 @@ def git(command):
command = ["git"] + command if command[0] != "git" else command

run(command, stdout=PIPE, stderr=PIPE, check=True)


def chain_commands(commands: list, chain: str = " && ") -> str:
return f"bash -c \"{chain.join(commands)}\""


class CustomEntryPoint:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Love the use of context managers, very elegant solution!

"""
Set a custom entrypoint on the container while entering the context.
Once outside, return it to its original value.
"""

def __init__(self, container, entrypoint):
self.container = container
self.old_entrypoint = container.entrypoint
self.new_entrypoint = entrypoint

def __enter__(self):
self.container.entrypoint = self.new_entrypoint

def __exit__(self, *args, **kwargs):
self.container.entrypoint = self.old_entrypoint


class EmptyEntryPoint(CustomEntryPoint):
"""
Force an empty entrypoint. This will let you execute any commands freely.
"""

def __init__(self, container):
super(EmptyEntryPoint, self).__init__(container, entrypoint="")
2 changes: 0 additions & 2 deletions leverage/container.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import json
from pathlib import Path
from datetime import datetime
from datetime import timedelta

import hcl2
from click.exceptions import Exit
Expand Down Expand Up @@ -699,4 +698,3 @@ def exec(self, command, *arguments):
self._prepare_container()

return self._exec(command, *arguments)

80 changes: 80 additions & 0 deletions leverage/containers/kubectl.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
import os
import pwd
from pathlib import Path

from click.exceptions import Exit
from docker.types import Mount

from leverage import logger
from leverage._utils import chain_commands, EmptyEntryPoint
from leverage.container import TerraformContainer


class KubeCtlContainer(TerraformContainer):
"""Container specifically tailored to run kubectl commands."""

KUBECTL_CLI_BINARY = "/usr/local/bin/kubectl"
KUBECTL_CONFIG_PATH = Path("/root/.kube")
KUBECTL_CONFIG_FILE = KUBECTL_CONFIG_PATH / Path("config")

def __init__(self, client):
super().__init__(client)

self.entrypoint = self.KUBECTL_CLI_BINARY

host_config_path = str(Path.home() / Path(f".kube/{self.project}"))
self.container_config["host_config"]["Mounts"].append(
# the container is expecting a file named "config" here
Mount(
source=host_config_path,
target=str(self.KUBECTL_CONFIG_PATH),
type="bind",
)
)

def start_shell(self):
with EmptyEntryPoint(self):
self._start()

def configure(self):
logger.info("Retrieving k8s cluster information...")
with EmptyEntryPoint(self):
add_eks_cluster_cmd = self._get_eks_kube_config()

# generate the command that will: configure the new cluster, and also set the proper user on the new config file
change_owner_cmd = self._change_kube_file_owner_cmd()
full_cmd = chain_commands([add_eks_cluster_cmd, change_owner_cmd])

logger.info("Configuring context...")
with EmptyEntryPoint(self):
exit_code, output = self._exec(full_cmd)
if exit_code:
logger.error(output)
raise Exit(exit_code)

logger.info("Done.")

def _get_eks_kube_config(self) -> str:
self.check_for_layer_location()

exit_code, output = self._exec(f"{self.TF_BINARY} output")
if exit_code:
logger.error(output)
raise Exit(exit_code)

aws_eks_cmd = output.split("\n")[10]
# assuming the cluster container is on the primary region
return aws_eks_cmd + f" --region {self.common_conf['region_primary']}"

def _change_kube_file_owner_cmd(self) -> str:
user_id = os.getuid()
user = pwd.getpwuid(user_id)
group_id = user.pw_gid

return f"chown {user_id}:{group_id} {self.KUBECTL_CONFIG_FILE}"

def check_for_layer_location(self):
super(KubeCtlContainer, self).check_for_layer_location()
if self.cwd.parts[-1] != "cluster":
logger.error("This command can only run at the [bold]cluster layer[/bold].")
raise Exit(1)
4 changes: 4 additions & 0 deletions leverage/leverage.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
from leverage.modules import terraform
from leverage.modules import credentials
from leverage.modules import tfautomv
from leverage.modules import kubectl


@click.group(invoke_without_command=True)
@click.option("--filename", "-f",
Expand Down Expand Up @@ -55,3 +57,5 @@ def leverage(context, state, filename, list_tasks, verbose):
leverage.add_command(credentials)
leverage.add_command(aws)
leverage.add_command(tfautomv)
leverage.add_command(kubectl)
leverage.add_command(kubectl, name="kc")
1 change: 1 addition & 0 deletions leverage/modules/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@
from .terraform import terraform
from .credentials import credentials
from .tfautomv import tfautomv
from .kubectl import kubectl
37 changes: 1 addition & 36 deletions leverage/modules/aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,42 +8,7 @@
from leverage._internals import pass_container
from leverage.container import get_docker_client
from leverage.container import AWSCLIContainer


def _handle_subcommand(context, cli_container, args, caller_name=None):
""" Decide if command corresponds to a wrapped one or not and run accordingly.

Args:
context (click.context): Current context
cli_container (AWSCLIContainer): Container where commands will be executed
args (tuple(str)): Arguments received by Leverage
caller_name (str, optional): Calling command. Defaults to None.

Raises:
Exit: Whenever container execution returns a non zero exit code
"""
caller_pos = args.index(caller_name) if caller_name is not None else 0

# Find if one of the wrapped subcommand was invoked
wrapped_subcommands = context.command.commands.keys()
subcommand = next((arg
for arg in args[caller_pos:]
if arg in wrapped_subcommands), None)

if subcommand is None:
# Pass command to aws cli directly
exit_code = cli_container.start(" ".join(args))
if not exit_code:
raise Exit(exit_code)

else:
# Invoke wrapped command
subcommand = context.command.commands.get(subcommand)
if not subcommand.params:
context.invoke(subcommand)
else:
context.forward(subcommand)

from leverage.modules.utils import _handle_subcommand

CONTEXT_SETTINGS={
"ignore_unknown_options": True
Expand Down
34 changes: 34 additions & 0 deletions leverage/modules/kubectl.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from leverage._internals import pass_state
from leverage._internals import pass_container
from leverage.container import get_docker_client
from leverage.containers.kubectl import KubeCtlContainer

import click

from leverage.modules.utils import _handle_subcommand

CONTEXT_SETTINGS = {"ignore_unknown_options": True}


@click.group(invoke_without_command=True, context_settings={"ignore_unknown_options": True})
@click.argument("args", nargs=-1, type=click.UNPROCESSED)
@pass_state
@click.pass_context
def kubectl(context, state, args):
"""Run Kubectl commands in a custom containerized environment."""
state.container = KubeCtlContainer(get_docker_client())
state.container.ensure_image()
_handle_subcommand(context=context, cli_container=state.container, args=args)


@kubectl.command(context_settings=CONTEXT_SETTINGS)
@pass_container
def shell(kctl: KubeCtlContainer):
"""Spawn a shell with the kubectl credentials pre-configured."""
kctl.start_shell()


@kubectl.command(context_settings=CONTEXT_SETTINGS)
@pass_container
def configure(kctl: KubeCtlContainer):
kctl.configure()
34 changes: 34 additions & 0 deletions leverage/modules/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from click.exceptions import Exit


def _handle_subcommand(context, cli_container, args, caller_name=None):
"""Decide if command corresponds to a wrapped one or not and run accordingly.

Args:
context (click.context): Current context
cli_container (LeverageContainer): Container where commands will be executed
args (tuple(str)): Arguments received by Leverage
caller_name (str, optional): Calling command. Defaults to None.

Raises:
Exit: Whenever container execution returns a non-zero exit code
"""
caller_pos = args.index(caller_name) if caller_name is not None else 0

# Find if one of the wrapped subcommand was invoked
wrapped_subcommands = context.command.commands.keys()
subcommand = next((arg for arg in args[caller_pos:] if arg in wrapped_subcommands), None)

if subcommand is None:
# Pass command to the container directly
exit_code = cli_container.start(" ".join(args))
if not exit_code:
raise Exit(exit_code)

else:
# Invoke wrapped command
subcommand = context.command.commands.get(subcommand)
if not subcommand.params:
context.invoke(subcommand)
else:
context.forward(subcommand)