-
-
Notifications
You must be signed in to change notification settings - Fork 2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
kubectl command wrapper #172
Changes from all commits
dc283e2
3e59d38
e03b6f1
ee28e60
aecb690
a4351a7
ce64aed
7afad25
0c7ae64
da23afb
cc668a8
779485d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -13,4 +13,6 @@ leverage.egg-info | |
**/__pycache__* | ||
.pytest_cache | ||
coverage | ||
.coverage | ||
.coverage | ||
|
||
.idea |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,9 +1,15 @@ | ||
""" | ||
General use utilities. | ||
""" | ||
import functools | ||
from subprocess import run | ||
from subprocess import PIPE | ||
|
||
from click.exceptions import Exit | ||
|
||
from leverage import logger | ||
|
||
|
||
def clean_exception_traceback(exception): | ||
""" Delete special local variables from all frames of an exception's traceback | ||
as to avoid polluting the output when displaying it. | ||
|
@@ -50,3 +56,78 @@ def git(command): | |
command = ["git"] + command if command[0] != "git" else command | ||
|
||
run(command, stdout=PIPE, stderr=PIPE, check=True) | ||
|
||
|
||
def chain_commands(commands: list, chain: str = " && ") -> str: | ||
return f"bash -c \"{chain.join(commands)}\"" | ||
|
||
|
||
class CustomEntryPoint: | ||
""" | ||
Set a custom entrypoint on the container while entering the context. | ||
Once outside, return it to its original value. | ||
""" | ||
|
||
def __init__(self, container, entrypoint): | ||
self.container = container | ||
self.old_entrypoint = container.entrypoint | ||
self.new_entrypoint = entrypoint | ||
|
||
def __enter__(self): | ||
self.container.entrypoint = self.new_entrypoint | ||
|
||
def __exit__(self, *args, **kwargs): | ||
self.container.entrypoint = self.old_entrypoint | ||
|
||
|
||
class EmptyEntryPoint(CustomEntryPoint): | ||
""" | ||
Force an empty entrypoint. This will let you execute any commands freely. | ||
""" | ||
|
||
def __init__(self, container): | ||
super(EmptyEntryPoint, self).__init__(container, entrypoint="") | ||
|
||
|
||
def refresh_aws_credentials(func): | ||
""" | ||
Use this decorator in the case you want to make sure you will have fresh tokens to interact with AWS | ||
during the execution of your wrapped method. | ||
""" | ||
@functools.wraps(func) | ||
def wrapper(*args, **kwargs): | ||
container = args[0] # this is the "self" of the method you are decorating; a LeverageContainer instance | ||
|
||
if container.sso_enabled: | ||
container._check_sso_token() | ||
auth_method = container.TF_SSO_ENTRYPOINT | ||
elif container.mfa_enabled: | ||
auth_method = container.TF_MFA_ENTRYPOINT | ||
# TODO: ask why this was necessary | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @angelofenoglio @juanmatias maybe you can clarify me this guys? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hi! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Considering this: https://github.com/binbashar/leverage/blob/master/leverage/container.py#L413-L416 |
||
container.environment.update({ | ||
"AWS_SHARED_CREDENTIALS_FILE": container.environment["AWS_SHARED_CREDENTIALS_FILE"].replace("tmp", ".aws"), | ||
"AWS_CONFIG_FILE": container.environment["AWS_CONFIG_FILE"].replace("tmp", ".aws"), | ||
}) | ||
else: | ||
# no auth method found: skip the refresh | ||
return func(*args, **kwargs) | ||
|
||
logger.info("Fetching AWS credentials...") | ||
with CustomEntryPoint(container, f"{auth_method} -- echo"): | ||
# this simple echo "Fetching..." will run the SSO/MFA entrypoints underneath | ||
# that takes care of the token refresh | ||
exit_code = container._start("Fetching done.") | ||
if exit_code: | ||
raise Exit(exit_code) | ||
if container.mfa_enabled: | ||
# we need to revert to the original values, otherwise other tools that rely on awscli, like kubectl | ||
# won't find the credentials | ||
container.environment.update({ | ||
"AWS_SHARED_CREDENTIALS_FILE": container.environment["AWS_SHARED_CREDENTIALS_FILE"].replace(".aws", "tmp"), | ||
"AWS_CONFIG_FILE": container.environment["AWS_CONFIG_FILE"].replace(".aws", "tmp"), | ||
}) | ||
|
||
# we should have a valid token at this point, now execute the original method | ||
return func(*args, **kwargs) | ||
|
||
return wrapper |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,87 @@ | ||
import os | ||
import pwd | ||
from pathlib import Path | ||
|
||
from click.exceptions import Exit | ||
from docker.types import Mount | ||
|
||
from leverage import logger | ||
from leverage._utils import chain_commands, EmptyEntryPoint, refresh_aws_credentials | ||
from leverage.container import TerraformContainer | ||
|
||
|
||
class KubeCtlContainer(TerraformContainer): | ||
"""Container specifically tailored to run kubectl commands.""" | ||
|
||
KUBECTL_CLI_BINARY = "/usr/local/bin/kubectl" | ||
KUBECTL_CONFIG_PATH = Path("/root/.kube") | ||
KUBECTL_CONFIG_FILE = KUBECTL_CONFIG_PATH / Path("config") | ||
|
||
def __init__(self, client): | ||
super().__init__(client) | ||
|
||
self.entrypoint = self.KUBECTL_CLI_BINARY | ||
|
||
host_config_path = str(Path.home() / Path(f".kube/{self.project}")) | ||
self.container_config["host_config"]["Mounts"].append( | ||
# the container is expecting a file named "config" here | ||
Mount( | ||
source=host_config_path, | ||
target=str(self.KUBECTL_CONFIG_PATH), | ||
type="bind", | ||
) | ||
) | ||
|
||
@refresh_aws_credentials | ||
def start_shell(self): | ||
with EmptyEntryPoint(self): | ||
self._start() | ||
|
||
@refresh_aws_credentials | ||
def configure(self): | ||
# make sure we are on the cluster layer | ||
self.check_for_layer_location() | ||
|
||
logger.info("Retrieving k8s cluster information...") | ||
with EmptyEntryPoint(self): | ||
# generate the command that will configure the new cluster | ||
add_eks_cluster_cmd = self._get_eks_kube_config() | ||
# and the command that will set the proper ownership on the config file (otherwise the owner will be "root") | ||
change_owner_cmd = self._change_kube_file_owner_cmd() | ||
full_cmd = chain_commands([add_eks_cluster_cmd, change_owner_cmd]) | ||
|
||
logger.info("Configuring context...") | ||
with EmptyEntryPoint(self): | ||
# we use _start here because in the case of MFA it will ask for the token | ||
exit_code = self._start(full_cmd) | ||
if exit_code: | ||
raise Exit(exit_code) | ||
|
||
logger.info("Done.") | ||
|
||
def _get_eks_kube_config(self) -> str: | ||
exit_code, output = self._exec(f"{self.TF_BINARY} output") | ||
if exit_code: | ||
logger.error(output) | ||
raise Exit(exit_code) | ||
|
||
aws_eks_cmd = next(op for op in output.split("\n") if op.startswith("aws eks update-kubeconfig")) | ||
# assuming the cluster container is on the primary region | ||
return aws_eks_cmd + f" --region {self.common_conf['region_primary']}" | ||
|
||
def _get_user_group_id(self, user_id) -> int: | ||
user = pwd.getpwuid(user_id) | ||
return user.pw_gid | ||
|
||
def _change_kube_file_owner_cmd(self) -> str: | ||
user_id = os.getuid() | ||
group_id = self._get_user_group_id(user_id) | ||
|
||
return f"chown {user_id}:{group_id} {self.KUBECTL_CONFIG_FILE}" | ||
|
||
def check_for_layer_location(self): | ||
super(KubeCtlContainer, self).check_for_layer_location() | ||
# assuming the "cluster" layer will contain the expected EKS outputs | ||
if self.cwd.parts[-1] != "cluster": | ||
logger.error("This command can only run at the [bold]cluster layer[/bold].") | ||
raise Exit(1) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,35 @@ | ||
from leverage._internals import pass_state | ||
from leverage._internals import pass_container | ||
from leverage.container import get_docker_client | ||
from leverage.containers.kubectl import KubeCtlContainer | ||
|
||
import click | ||
|
||
from leverage.modules.utils import _handle_subcommand | ||
|
||
CONTEXT_SETTINGS = {"ignore_unknown_options": True} | ||
|
||
|
||
@click.group(invoke_without_command=True, context_settings={"ignore_unknown_options": True}) | ||
@click.argument("args", nargs=-1, type=click.UNPROCESSED) | ||
@pass_state | ||
@click.pass_context | ||
def kubectl(context, state, args): | ||
"""Run Kubectl commands in a custom containerized environment.""" | ||
state.container = KubeCtlContainer(get_docker_client()) | ||
state.container.ensure_image() | ||
_handle_subcommand(context=context, cli_container=state.container, args=args) | ||
|
||
|
||
@kubectl.command(context_settings=CONTEXT_SETTINGS) | ||
@pass_container | ||
def shell(kctl: KubeCtlContainer): | ||
"""Spawn a shell with the kubectl credentials pre-configured.""" | ||
kctl.start_shell() | ||
|
||
|
||
@kubectl.command(context_settings=CONTEXT_SETTINGS) | ||
@pass_container | ||
def configure(kctl: KubeCtlContainer): | ||
"""Automatically add the EKS cluster from the layer into your kubectl config file.""" | ||
kctl.configure() |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,34 @@ | ||
from click.exceptions import Exit | ||
|
||
|
||
def _handle_subcommand(context, cli_container, args, caller_name=None): | ||
"""Decide if command corresponds to a wrapped one or not and run accordingly. | ||
Args: | ||
context (click.context): Current context | ||
cli_container (LeverageContainer): Container where commands will be executed | ||
args (tuple(str)): Arguments received by Leverage | ||
caller_name (str, optional): Calling command. Defaults to None. | ||
Raises: | ||
Exit: Whenever container execution returns a non-zero exit code | ||
""" | ||
caller_pos = args.index(caller_name) if caller_name is not None else 0 | ||
|
||
# Find if one of the wrapped subcommand was invoked | ||
wrapped_subcommands = context.command.commands.keys() | ||
subcommand = next((arg for arg in args[caller_pos:] if arg in wrapped_subcommands), None) | ||
|
||
if subcommand is None: | ||
# Pass command to the container directly | ||
exit_code = cli_container.start(" ".join(args)) | ||
if not exit_code: | ||
raise Exit(exit_code) | ||
|
||
else: | ||
# Invoke wrapped command | ||
subcommand = context.command.commands.get(subcommand) | ||
if not subcommand.params: | ||
context.invoke(subcommand) | ||
else: | ||
context.forward(subcommand) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Love the use of context managers, very elegant solution!