Skip to content
This repository has been archived by the owner on Sep 14, 2020. It is now read-only.

Annotate all the code with the types #200

Merged
merged 9 commits into from
Oct 7, 2019
Merged
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ coverage.xml
junit.xml
*.cover
.hypothesis/
.mypy_cache

# Documentation
docs/_build
Expand Down
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ script:
- coveralls
- codecov --flags unit
- pytest -v --only-e2e # NB: after the coverage uploads!
- mypy kopf --strict --pretty

deploy:
provider: pypi
Expand Down
1 change: 1 addition & 0 deletions docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@

intersphinx_mapping = {
'python': ('https://docs.python.org/3', None),
'mypy': ('https://mypy.readthedocs.io/en/latest/', None),
}


Expand Down
14 changes: 5 additions & 9 deletions docs/configuring.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,17 @@ Configure logging events
========================

`kopf.config.EventsConfig` allows to set what types of kopf logs should be
reflected in events.

Loglevels are:

* ``kopf.config.LOGLEVEL_INFO``
* ``kopf.config.LOGLEVEL_WARNING``
* ``kopf.config.LOGLEVEL_ERROR``
* ``kopf.config.LOGLEVEL_CRITICAL``
reflected in events. Use `logging` constants or integer values to set the level:
e.g., `logging.WARNING`, `logging.ERROR`, etc. The default is `logging.INFO`.

.. code-block:: python

import logging
import kopf

# Now kopf will send events only when error or critical occasion happens
kopf.EventsConfig.events_loglevel = config.LOGLEVEL_ERROR
kopf.EventsConfig.events_loglevel = logging.ERROR


Configure Workers
=================
Expand Down
39 changes: 29 additions & 10 deletions kopf/cli.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import asyncio
import functools
from typing import Any, Optional, Callable, List

import click

Expand All @@ -10,7 +11,7 @@
from kopf.utilities import loaders


def cli_login():
def cli_login() -> None:
try:
auth.login(verify=True)
except auth.LoginError as e:
Expand All @@ -19,13 +20,13 @@ def cli_login():
raise click.ClickException(str(e))


def logging_options(fn):
def logging_options(fn: Callable[..., Any]) -> Callable[..., Any]:
""" A decorator to configure logging in all command in the same way."""
@click.option('-v', '--verbose', is_flag=True)
@click.option('-d', '--debug', is_flag=True)
@click.option('-q', '--quiet', is_flag=True)
@functools.wraps(fn) # to preserve other opts/args
def wrapper(verbose, quiet, debug, *args, **kwargs):
def wrapper(verbose: bool, quiet: bool, debug: bool, *args: Any, **kwargs: Any) -> Any:
config.configure(debug=debug, verbose=verbose, quiet=quiet)
return fn(*args, **kwargs)

Expand All @@ -36,7 +37,7 @@ def wrapper(verbose, quiet, debug, *args, **kwargs):
@click.group(name='kopf', context_settings=dict(
auto_envvar_prefix='KOPF',
))
def main():
def main() -> None:
pass


Expand All @@ -49,7 +50,14 @@ def main():
@click.option('-p', '--priority', type=int, default=0)
@click.option('-m', '--module', 'modules', multiple=True)
@click.argument('paths', nargs=-1)
def run(paths, modules, peering_name, priority, standalone, namespace):
def run(
paths: List[str],
modules: List[str],
peering_name: Optional[str],
priority: int,
standalone: bool,
namespace: Optional[str],
) -> None:
""" Start an operator process and handle all the requests. """
cli_login()
loaders.preload(
Expand All @@ -69,11 +77,18 @@ def run(paths, modules, peering_name, priority, standalone, namespace):
@click.option('-n', '--namespace', default=None)
@click.option('-i', '--id', type=str, default=None)
@click.option('--dev', 'priority', flag_value=666)
@click.option('-P', '--peering', 'peering_name', type=str, default=None, envvar='KOPF_FREEZE_PEERING')
@click.option('-p', '--priority', type=int, default=100)
@click.option('-P', '--peering', 'peering_name', type=str, required=True, envvar='KOPF_FREEZE_PEERING')
thilp marked this conversation as resolved.
Show resolved Hide resolved
@click.option('-p', '--priority', type=int, default=100, required=True)
@click.option('-t', '--lifetime', type=int, required=True)
@click.option('-m', '--message', type=str)
def freeze(id, message, lifetime, namespace, peering_name, priority):
def freeze(
id: Optional[str],
message: Optional[str],
lifetime: int,
namespace: Optional[str],
peering_name: str,
priority: int,
) -> None:
""" Freeze the resource handling in the cluster. """
cli_login()
ourserlves = peering.Peer(
Expand All @@ -91,8 +106,12 @@ def freeze(id, message, lifetime, namespace, peering_name, priority):
@logging_options
@click.option('-n', '--namespace', default=None)
@click.option('-i', '--id', type=str, default=None)
@click.option('-P', '--peering', 'peering_name', type=str, default=None, envvar='KOPF_RESUME_PEERING')
def resume(id, namespace, peering_name):
@click.option('-P', '--peering', 'peering_name', type=str, required=True, envvar='KOPF_RESUME_PEERING')
def resume(
id: Optional[str],
namespace: Optional[str],
peering_name: str,
) -> None:
""" Resume the resource handling in the cluster. """
cli_login()
ourselves = peering.Peer(
Expand Down
14 changes: 8 additions & 6 deletions kopf/clients/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class AccessError(Exception):
""" Raised when the operator cannot access the cluster API. """


def login(verify=False):
def login(verify: bool = False) -> None:
"""
Login to Kubernetes cluster, locally or remotely.

Expand Down Expand Up @@ -47,7 +47,7 @@ def login(verify=False):
login_client(verify=verify)


def login_pykube(verify=False):
def login_pykube(verify: bool = False) -> None:
global _pykube_cfg
try:
_pykube_cfg = pykube.KubeConfig.from_service_account()
Expand All @@ -63,7 +63,7 @@ def login_pykube(verify=False):
verify_pykube()


def login_client(verify=False):
def login_client(verify: bool = False) -> None:
import kubernetes.client
try:
kubernetes.config.load_incluster_config() # cluster env vars
Expand All @@ -79,7 +79,7 @@ def login_client(verify=False):
verify_client()


def verify_pykube():
def verify_pykube() -> None:
"""
Verify if login has succeeded, and the access configuration is still valid.

Expand All @@ -105,7 +105,7 @@ def verify_pykube():
"Please login or configure the tokens.")


def verify_client():
def verify_client() -> None:
"""
Verify if login has succeeded, and the access configuration is still valid.

Expand Down Expand Up @@ -133,6 +133,8 @@ def get_pykube_cfg() -> pykube.KubeConfig:


# TODO: add some caching, but keep kwargs in mind. Maybe add a key= for purpose/use-place?
def get_pykube_api(timeout=None) -> pykube.HTTPClient:
def get_pykube_api(
timeout: Optional[float] = None,
) -> pykube.HTTPClient:
kwargs = dict(timeout=timeout)
return pykube.HTTPClient(get_pykube_cfg(), **kwargs)
6 changes: 5 additions & 1 deletion kopf/clients/classes.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,13 @@
import pykube

from kopf.clients import auth
from kopf.structs import resources


def _make_cls(resource) -> Type[pykube.objects.APIObject]:
def _make_cls(
resource: resources.Resource,
) -> Type[pykube.objects.APIObject]:

api = auth.get_pykube_api()
api_resources = api.resource_list(resource.api_version)['resources']
resource_kind = next((r['kind'] for r in api_resources if r['name'] == resource.plural), None)
Expand Down
26 changes: 12 additions & 14 deletions kopf/clients/events.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import copy
import datetime
import logging

Expand All @@ -15,30 +16,27 @@
CUT_MESSAGE_INFIX = '...'


async def post_event(*, obj=None, ref=None, type, reason, message=''):
async def post_event(
*,
ref: bodies.ObjectReference,
type: str,
reason: str,
message: str = '',
) -> None:
"""
Issue an event for the object.

This is where they can also be accumulated, aggregated, grouped,
and where the rate-limits should be maintained. It can (and should)
be done by the client library, as it is done in the Go client.
"""

# Object reference - similar to the owner reference, but different.
if obj is not None and ref is not None:
raise TypeError("Only one of obj= and ref= is allowed for a posted event. Got both.")
if obj is None and ref is None:
raise TypeError("One of obj= and ref= is required for a posted event. Got none.")
if ref is None:
ref = bodies.build_object_reference(obj)

now = datetime.datetime.utcnow()

# See #164. For cluster-scoped objects, use the current namespace from the current context.
# It could be "default", but in some systems, we are limited to one specific namespace only.
namespace = ref.get('namespace') or auth.get_pykube_cfg().namespace
if not ref.get('namespace'):
ref = dict(ref, namespace=namespace)
namespace: str = ref.get('namespace') or auth.get_pykube_cfg().namespace
full_ref: bodies.ObjectReference = copy.copy(ref)
full_ref['namespace'] = namespace

# Prevent a common case of event posting errors but shortening the message.
if len(message) > MAX_MESSAGE_LENGTH:
Expand All @@ -62,7 +60,7 @@ async def post_event(*, obj=None, ref=None, type, reason, message=''):
'reportingInstance': 'dev',
'source' : {'component': 'kopf'}, # used in the "From" column in `kubectl describe`.

'involvedObject': ref,
'involvedObject': full_ref,

'firstTimestamp': now.isoformat() + 'Z', # '2019-01-28T18:25:03.000000Z' -- seen in `kubectl describe ...`
'lastTimestamp': now.isoformat() + 'Z', # '2019-01-28T18:25:03.000000Z' - seen in `kubectl get events`
Expand Down
45 changes: 34 additions & 11 deletions kopf/clients/fetching.py
Original file line number Diff line number Diff line change
@@ -1,47 +1,70 @@
import enum

import pykube
import requests
from typing import TypeVar, Optional, Union, Collection, List, Tuple, cast

from kopf.clients import auth
from kopf.clients import classes
from kopf.structs import bodies
from kopf.structs import resources

_T = TypeVar('_T')


_UNSET_ = object()
class _UNSET(enum.Enum):
token = enum.auto()


def read_crd(*, resource, default=_UNSET_):
def read_crd(
*,
resource: resources.Resource,
default: Union[_T, _UNSET] = _UNSET.token,
) -> Union[bodies.Body, _T]:
try:
api = auth.get_pykube_api()
cls = pykube.CustomResourceDefinition
obj = cls.objects(api, namespace=None).get_by_name(name=resource.name)
return obj.obj
return cast(bodies.Body, obj.obj)

except pykube.ObjectDoesNotExist:
if default is not _UNSET_:
if not isinstance(default, _UNSET):
return default
raise
except requests.exceptions.HTTPError as e:
if e.response.status_code in [403, 404] and default is not _UNSET_:
if not isinstance(default, _UNSET) and e.response.status_code in [403, 404]:
return default
raise


def read_obj(*, resource, namespace=None, name=None, default=_UNSET_):
def read_obj(
*,
resource: resources.Resource,
namespace: Optional[str] = None,
name: Optional[str] = None,
default: Union[_T, _UNSET] = _UNSET.token,
) -> Union[bodies.Body, _T]:
try:
api = auth.get_pykube_api()
cls = classes._make_cls(resource=resource)
namespace = namespace if issubclass(cls, pykube.objects.NamespacedAPIObject) else None
obj = cls.objects(api, namespace=namespace).get_by_name(name=name)
return obj.obj
return cast(bodies.Body, obj.obj)
except pykube.ObjectDoesNotExist:
if default is not _UNSET_:
if not isinstance(default, _UNSET):
return default
raise
except requests.exceptions.HTTPError as e:
if e.response.status_code in [403, 404] and default is not _UNSET_:
if not isinstance(default, _UNSET) and e.response.status_code in [403, 404]:
return default
raise


def list_objs_rv(*, resource, namespace=None):
def list_objs_rv(
*,
resource: resources.Resource,
namespace: Optional[str] = None,
) -> Tuple[Collection[bodies.Body], str]:
"""
List the objects of specific resource type.

Expand All @@ -60,7 +83,7 @@ def list_objs_rv(*, resource, namespace=None):
lst = cls.objects(api, namespace=pykube.all if namespace is None else namespace)
rsp = lst.response

items = []
items: List[bodies.Body] = []
resource_version = rsp.get('metadata', {}).get('resourceVersion', None)
for item in rsp['items']:
# FIXME: fix in pykube to inject the missing item's fields from the list's metainfo.
Expand Down
Loading