From 699720c05d9c79fd73b22ebbf0b8aaaa3df8259e Mon Sep 17 00:00:00 2001 From: Sergey Vasilyev Date: Wed, 2 Oct 2019 18:56:15 +0200 Subject: [PATCH 1/8] Convert all special marker objects to enum (as per Python typing docs) --- kopf/clients/fetching.py | 21 ++++++++++++++------- kopf/config.py | 5 +---- kopf/reactor/queueing.py | 18 +++++++++++------- kopf/structs/dicts.py | 20 ++++++++++++-------- tests/reactor/test_queueing.py | 2 +- 5 files changed, 39 insertions(+), 27 deletions(-) diff --git a/kopf/clients/fetching.py b/kopf/clients/fetching.py index 70957b34..3f5e0e6e 100644 --- a/kopf/clients/fetching.py +++ b/kopf/clients/fetching.py @@ -1,13 +1,20 @@ +import enum + import pykube import requests +from typing import TypeVar, Optional, Union, Collection, List, Tuple, cast from kopf.clients import auth from kopf.clients import classes -_UNSET_ = object() +_T = TypeVar('_T') + + +class _UNSET(enum.Enum): + token = enum.auto() -def read_crd(*, resource, default=_UNSET_): +def read_crd(*, resource, default=_UNSET.token): try: api = auth.get_pykube_api() cls = pykube.CustomResourceDefinition @@ -15,16 +22,16 @@ def read_crd(*, resource, default=_UNSET_): return obj.obj except pykube.ObjectDoesNotExist: - if default is not _UNSET_: + if not isinstance(default, _UNSET): return default raise except requests.exceptions.HTTPError as e: - if e.response.status_code in [403, 404] and default is not _UNSET_: + if not isinstance(default, _UNSET) and e.response.status_code in [403, 404]: return default raise -def read_obj(*, resource, namespace=None, name=None, default=_UNSET_): +def read_obj(*, resource, namespace=None, name=None, default=_UNSET.token): try: api = auth.get_pykube_api() cls = classes._make_cls(resource=resource) @@ -32,11 +39,11 @@ def read_obj(*, resource, namespace=None, name=None, default=_UNSET_): obj = cls.objects(api, namespace=namespace).get_by_name(name=name) return obj.obj except pykube.ObjectDoesNotExist: - if default is not _UNSET_: + if not isinstance(default, _UNSET): return default raise except requests.exceptions.HTTPError as e: - if e.response.status_code in [403, 404] and default is not _UNSET_: + if not isinstance(default, _UNSET) and e.response.status_code in [403, 404]: return default raise diff --git a/kopf/config.py b/kopf/config.py index 7c7a74ef..b4818ed6 100644 --- a/kopf/config.py +++ b/kopf/config.py @@ -8,14 +8,11 @@ format = '[%(asctime)s] %(name)-20.20s [%(levelname)-8.8s] %(message)s' +# Deprecated: use ``logging.*`` constants instead. Kept here for backward-compatibility. LOGLEVEL_INFO = logging.INFO -""" Event loglevel to log all events. """ LOGLEVEL_WARNING = logging.WARNING -""" Event loglevel to log all events except informational. """ LOGLEVEL_ERROR = logging.ERROR -""" Event loglevel to log only errors and critical events. """ LOGLEVEL_CRITICAL = logging.CRITICAL -""" Event loglevel to log only critical events(basically - no events). """ def configure(debug=None, verbose=None, quiet=None): diff --git a/kopf/reactor/queueing.py b/kopf/reactor/queueing.py index 03d3bbd5..ec9ea052 100644 --- a/kopf/reactor/queueing.py +++ b/kopf/reactor/queueing.py @@ -24,6 +24,7 @@ """ import asyncio +import enum import logging import time from typing import Callable, Tuple, Union, MutableMapping, NewType, NamedTuple @@ -37,6 +38,12 @@ logger = logging.getLogger(__name__) +# An end-of-stream marker sent from the watcher to the workers. +# See: https://www.python.org/dev/peps/pep-0484/#support-for-singleton-types-in-unions +class EOS(enum.Enum): + token = enum.auto() + + class Stream(NamedTuple): """ A single object's stream of watch-events, with some extra helpers. """ watchevents: asyncio.Queue @@ -47,9 +54,6 @@ class Stream(NamedTuple): ObjectRef = Tuple[resources.Resource, ObjectUid] Streams = MutableMapping[ObjectRef, Stream] -EOS = object() -""" An end-of-stream marker sent from the watcher to the workers. """ - # TODO: add the label_selector support for the dev-mode? async def watcher( @@ -137,13 +141,13 @@ async def worker( next_event = await asyncio.wait_for( watchevents.get(), timeout=config.WorkersConfig.worker_batch_window) - shouldstop = shouldstop or next_event is EOS - event = prev_event if next_event is EOS else next_event + shouldstop = shouldstop or isinstance(next_event, EOS) + event = prev_event if isinstance(next_event, EOS) else next_event except asyncio.TimeoutError: pass # Exit gracefully and immediately on the end-of-stream marker sent by the watcher. - if event is EOS: + if isinstance(event, EOS): break # Try the handler. In case of errors, show the error, but continue the queue processing. @@ -168,7 +172,7 @@ async def _wait_for_depletion(*, scheduler: aiojobs.Scheduler, streams: Streams) # Notify all the workers to finish now. Wake them up if they are waiting in the queue-getting. for stream in streams.values(): - await stream.watchevents.put(EOS) + await stream.watchevents.put(EOS.token) # Wait for the queues to be depleted, but only if there are some workers running. # Continue with the tasks termination if the timeout is reached, no matter the queues. diff --git a/kopf/structs/dicts.py b/kopf/structs/dicts.py index 306e6f77..5ee79cc1 100644 --- a/kopf/structs/dicts.py +++ b/kopf/structs/dicts.py @@ -2,13 +2,18 @@ Some basic dicts and field-in-a-dict manipulation helpers. """ import collections.abc +import enum from typing import (Any, Union, MutableMapping, Mapping, Tuple, List, Text, - Iterable, Iterator, Optional) + Iterable, Iterator, Optional, TypeVar) FieldPath = Tuple[str, ...] FieldSpec = Union[None, Text, FieldPath, List[str]] -_UNSET = object() +_T = TypeVar('_T') + + +class _UNSET(enum.Enum): + token = enum.auto() def parse_field( @@ -37,10 +42,10 @@ def parse_field( def resolve( d: Mapping, field: FieldSpec, - default: Any = _UNSET, + default: Union[_T, _UNSET] = _UNSET.token, *, assume_empty: bool = False, -): +) -> Union[Any, _T]: """ Retrieve a nested sub-field from a dict. @@ -54,7 +59,7 @@ def resolve( try: result = d for key in path: - if result is None and assume_empty and default is not _UNSET: + if result is None and assume_empty and not isinstance(default, _UNSET): return default elif isinstance(result, collections.abc.Mapping): result = result[key] @@ -62,10 +67,9 @@ def resolve( raise TypeError(f"The structure is not a dict with field {key!r}: {result!r}") return result except KeyError: - if default is _UNSET: - raise - else: + if not isinstance(default, _UNSET): return default + raise def ensure( diff --git a/tests/reactor/test_queueing.py b/tests/reactor/test_queueing.py index 8d609af0..a0848a23 100644 --- a/tests/reactor/test_queueing.py +++ b/tests/reactor/test_queueing.py @@ -85,7 +85,7 @@ async def test_watchevent_demultiplexing(worker_mock, timer, resource, handler, queue_events.append(streams[key].watchevents.get_nowait()) assert len(queue_events) == cnt + 1 - assert queue_events[-1] is EOS + assert queue_events[-1] is EOS.token assert all(queue_event['object']['metadata']['uid'] == uid for queue_event in queue_events[:-1]) From 68a8649599268c75ddd968cb8cd97e76c818a490 Mon Sep 17 00:00:00 2001 From: Sergey Vasilyev Date: Wed, 2 Oct 2019 20:19:34 +0200 Subject: [PATCH 2/8] Remove an unused arg in favour of actually used one --- kopf/clients/events.py | 11 +---------- tests/k8s/test_events.py | 16 +++++++++++----- 2 files changed, 12 insertions(+), 15 deletions(-) diff --git a/kopf/clients/events.py b/kopf/clients/events.py index 4a94ddb6..85348d2e 100644 --- a/kopf/clients/events.py +++ b/kopf/clients/events.py @@ -15,7 +15,7 @@ CUT_MESSAGE_INFIX = '...' -async def post_event(*, obj=None, ref=None, type, reason, message=''): +async def post_event(*, ref, type, reason, message=''): """ Issue an event for the object. @@ -23,15 +23,6 @@ async def post_event(*, obj=None, ref=None, type, reason, message=''): and where the rate-limits should be maintained. It can (and should) be done by the client library, as it is done in the Go client. """ - - # Object reference - similar to the owner reference, but different. - if obj is not None and ref is not None: - raise TypeError("Only one of obj= and ref= is allowed for a posted event. Got both.") - if obj is None and ref is None: - raise TypeError("One of obj= and ref= is required for a posted event. Got none.") - if ref is None: - ref = bodies.build_object_reference(obj) - now = datetime.datetime.utcnow() # See #164. For cluster-scoped objects, use the current namespace from the current context. diff --git a/tests/k8s/test_events.py b/tests/k8s/test_events.py index 515daea9..59c8dfd1 100644 --- a/tests/k8s/test_events.py +++ b/tests/k8s/test_events.py @@ -4,6 +4,7 @@ import requests from kopf.clients.events import post_event +from kopf.structs.bodies import build_object_reference async def test_posting(req_mock): @@ -12,7 +13,8 @@ async def test_posting(req_mock): 'metadata': {'namespace': 'ns', 'name': 'name', 'uid': 'uid'}} - await post_event(obj=obj, type='type', reason='reason', message='message') + ref = build_object_reference(obj) + await post_event(ref=ref, type='type', reason='reason', message='message') assert req_mock.post.called assert req_mock.post.call_count == 1 @@ -35,7 +37,8 @@ async def test_type_is_v1_not_v1beta1(req_mock): 'metadata': {'namespace': 'ns', 'name': 'name', 'uid': 'uid'}} - await post_event(obj=obj, type='type', reason='reason', message='message') + ref = build_object_reference(obj) + await post_event(ref=ref, type='type', reason='reason', message='message') assert req_mock.post.called @@ -54,7 +57,8 @@ async def test_api_errors_logged_but_suppressed(req_mock, assert_logs): 'metadata': {'namespace': 'ns', 'name': 'name', 'uid': 'uid'}} - await post_event(obj=obj, type='type', reason='reason', message='message') + ref = build_object_reference(obj) + await post_event(ref=ref, type='type', reason='reason', message='message') assert req_mock.post.called assert_logs([ @@ -71,9 +75,10 @@ async def test_regular_errors_escalate(req_mock): 'metadata': {'namespace': 'ns', 'name': 'name', 'uid': 'uid'}} + ref = build_object_reference(obj) with pytest.raises(Exception) as excinfo: - await post_event(obj=obj, type='type', reason='reason', message='message') + await post_event(ref=ref, type='type', reason='reason', message='message') assert excinfo.value is error @@ -84,8 +89,9 @@ async def test_message_is_cut_to_max_length(req_mock): 'metadata': {'namespace': 'ns', 'name': 'name', 'uid': 'uid'}} + ref = build_object_reference(obj) message = 'start' + ('x' * 2048) + 'end' - await post_event(obj=obj, type='type', reason='reason', message=message) + await post_event(ref=ref, type='type', reason='reason', message=message) data = json.loads(req_mock.post.call_args_list[0][1]['data']) assert len(data['message']) <= 1024 # max supported API message length From b1491893e2d44f0bc7ad07319b9834116e85b69f Mon Sep 17 00:00:00 2001 From: Sergey Vasilyev Date: Wed, 2 Oct 2019 20:22:34 +0200 Subject: [PATCH 3/8] Fix field resolution on addition/removal in field-handlers --- kopf/reactor/handling.py | 4 ++-- tests/basic-structs/test_cause.py | 3 ++- tests/causation/test_detection.py | 1 + 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/kopf/reactor/handling.py b/kopf/reactor/handling.py index 09d33a5f..346b204d 100644 --- a/kopf/reactor/handling.py +++ b/kopf/reactor/handling.py @@ -465,8 +465,8 @@ async def _call_handler( """ # For the field-handlers, the old/new/diff values must match the field, not the whole object. - old = cause.old if handler.field is None else dicts.resolve(cause.old, handler.field, None) - new = cause.new if handler.field is None else dicts.resolve(cause.new, handler.field, None) + old = cause.old if handler.field is None else dicts.resolve(cause.old, handler.field, None, assume_empty=True) + new = cause.new if handler.field is None else dicts.resolve(cause.new, handler.field, None, assume_empty=True) diff = cause.diff if handler.field is None else diffs.reduce(cause.diff, handler.field) cause = causation.enrich_cause(cause=cause, old=old, new=new, diff=diff) diff --git a/tests/basic-structs/test_cause.py b/tests/basic-structs/test_cause.py index 52aeb5c7..8e1f89c1 100644 --- a/tests/basic-structs/test_cause.py +++ b/tests/basic-structs/test_cause.py @@ -61,6 +61,7 @@ def test_required_args(mocker): assert cause.initial is initial assert cause.body is body assert cause.patch is patch - assert cause.diff is None + assert cause.diff is not None + assert not cause.diff assert cause.old is None assert cause.new is None diff --git a/tests/causation/test_detection.py b/tests/causation/test_detection.py index b4bfcdf5..33c49c1e 100644 --- a/tests/causation/test_detection.py +++ b/tests/causation/test_detection.py @@ -5,6 +5,7 @@ from kopf.reactor.causation import CREATE, UPDATE, DELETE, NOOP, FREE, GONE, ACQUIRE, RELEASE from kopf.reactor.causation import detect_cause +from kopf.structs.diffs import Diff from kopf.structs.finalizers import FINALIZER from kopf.structs.lastseen import LAST_SEEN_ANNOTATION From c5f00c101c676cce2aaf941e2cf6a057e50b578d Mon Sep 17 00:00:00 2001 From: Sergey Vasilyev Date: Wed, 2 Oct 2019 20:04:52 +0200 Subject: [PATCH 4/8] Add type-hints all over the code --- kopf/cli.py | 39 ++- kopf/clients/auth.py | 14 +- kopf/clients/classes.py | 6 +- kopf/clients/events.py | 17 +- kopf/clients/fetching.py | 28 +- kopf/clients/patching.py | 23 +- kopf/clients/watching.py | 46 +++- kopf/config.py | 24 +- kopf/engines/logging.py | 32 ++- kopf/engines/peering.py | 42 +-- kopf/engines/posting.py | 65 +++-- kopf/on.py | 84 +++--- kopf/reactor/causation.py | 84 ++---- kopf/reactor/handling.py | 75 +++--- kopf/reactor/invocation.py | 35 ++- kopf/reactor/lifecycles.py | 43 +++- kopf/reactor/queueing.py | 36 ++- kopf/reactor/registries.py | 270 ++++++++++++++------ kopf/reactor/running.py | 91 +++++-- kopf/reactor/state.py | 107 ++++++-- kopf/structs/bodies.py | 157 +++++++++++- kopf/structs/dicts.py | 47 ++-- kopf/structs/diffs.py | 79 ++++-- kopf/structs/finalizers.py | 22 +- kopf/structs/lastseen.py | 79 +++--- kopf/structs/patches.py | 16 ++ kopf/structs/resources.py | 4 +- kopf/toolkits/hierarchies.py | 61 +++-- kopf/toolkits/runner.py | 64 +++-- kopf/utilities/loaders.py | 8 +- setup.py | 1 + tests/hierarchies/test_owner_referencing.py | 23 +- tests/invocations/test_callbacks.py | 2 +- tests/lifecycles/test_global_defaults.py | 4 +- tests/test_lastseen.py | 22 +- 35 files changed, 1235 insertions(+), 515 deletions(-) create mode 100644 kopf/structs/patches.py diff --git a/kopf/cli.py b/kopf/cli.py index 3b29126c..9494eebf 100644 --- a/kopf/cli.py +++ b/kopf/cli.py @@ -1,5 +1,6 @@ import asyncio import functools +from typing import Any, Optional, Callable, List import click @@ -10,7 +11,7 @@ from kopf.utilities import loaders -def cli_login(): +def cli_login() -> None: try: auth.login(verify=True) except auth.LoginError as e: @@ -19,13 +20,13 @@ def cli_login(): raise click.ClickException(str(e)) -def logging_options(fn): +def logging_options(fn: Callable[..., Any]) -> Callable[..., Any]: """ A decorator to configure logging in all command in the same way.""" @click.option('-v', '--verbose', is_flag=True) @click.option('-d', '--debug', is_flag=True) @click.option('-q', '--quiet', is_flag=True) @functools.wraps(fn) # to preserve other opts/args - def wrapper(verbose, quiet, debug, *args, **kwargs): + def wrapper(verbose: bool, quiet: bool, debug: bool, *args: Any, **kwargs: Any) -> Any: config.configure(debug=debug, verbose=verbose, quiet=quiet) return fn(*args, **kwargs) @@ -36,7 +37,7 @@ def wrapper(verbose, quiet, debug, *args, **kwargs): @click.group(name='kopf', context_settings=dict( auto_envvar_prefix='KOPF', )) -def main(): +def main() -> None: pass @@ -49,7 +50,14 @@ def main(): @click.option('-p', '--priority', type=int, default=0) @click.option('-m', '--module', 'modules', multiple=True) @click.argument('paths', nargs=-1) -def run(paths, modules, peering_name, priority, standalone, namespace): +def run( + paths: List[str], + modules: List[str], + peering_name: Optional[str], + priority: int, + standalone: bool, + namespace: Optional[str], +) -> None: """ Start an operator process and handle all the requests. """ cli_login() loaders.preload( @@ -69,11 +77,18 @@ def run(paths, modules, peering_name, priority, standalone, namespace): @click.option('-n', '--namespace', default=None) @click.option('-i', '--id', type=str, default=None) @click.option('--dev', 'priority', flag_value=666) -@click.option('-P', '--peering', 'peering_name', type=str, default=None, envvar='KOPF_FREEZE_PEERING') -@click.option('-p', '--priority', type=int, default=100) +@click.option('-P', '--peering', 'peering_name', type=str, required=True, envvar='KOPF_FREEZE_PEERING') +@click.option('-p', '--priority', type=int, default=100, required=True) @click.option('-t', '--lifetime', type=int, required=True) @click.option('-m', '--message', type=str) -def freeze(id, message, lifetime, namespace, peering_name, priority): +def freeze( + id: Optional[str], + message: Optional[str], + lifetime: int, + namespace: Optional[str], + peering_name: str, + priority: int, +) -> None: """ Freeze the resource handling in the cluster. """ cli_login() ourserlves = peering.Peer( @@ -91,8 +106,12 @@ def freeze(id, message, lifetime, namespace, peering_name, priority): @logging_options @click.option('-n', '--namespace', default=None) @click.option('-i', '--id', type=str, default=None) -@click.option('-P', '--peering', 'peering_name', type=str, default=None, envvar='KOPF_RESUME_PEERING') -def resume(id, namespace, peering_name): +@click.option('-P', '--peering', 'peering_name', type=str, required=True, envvar='KOPF_RESUME_PEERING') +def resume( + id: Optional[str], + namespace: Optional[str], + peering_name: str, +) -> None: """ Resume the resource handling in the cluster. """ cli_login() ourselves = peering.Peer( diff --git a/kopf/clients/auth.py b/kopf/clients/auth.py index aad9e640..6591e29f 100644 --- a/kopf/clients/auth.py +++ b/kopf/clients/auth.py @@ -19,7 +19,7 @@ class AccessError(Exception): """ Raised when the operator cannot access the cluster API. """ -def login(verify=False): +def login(verify: bool = False) -> None: """ Login to Kubernetes cluster, locally or remotely. @@ -47,7 +47,7 @@ def login(verify=False): login_client(verify=verify) -def login_pykube(verify=False): +def login_pykube(verify: bool = False) -> None: global _pykube_cfg try: _pykube_cfg = pykube.KubeConfig.from_service_account() @@ -63,7 +63,7 @@ def login_pykube(verify=False): verify_pykube() -def login_client(verify=False): +def login_client(verify: bool = False) -> None: import kubernetes.client try: kubernetes.config.load_incluster_config() # cluster env vars @@ -79,7 +79,7 @@ def login_client(verify=False): verify_client() -def verify_pykube(): +def verify_pykube() -> None: """ Verify if login has succeeded, and the access configuration is still valid. @@ -105,7 +105,7 @@ def verify_pykube(): "Please login or configure the tokens.") -def verify_client(): +def verify_client() -> None: """ Verify if login has succeeded, and the access configuration is still valid. @@ -133,6 +133,8 @@ def get_pykube_cfg() -> pykube.KubeConfig: # TODO: add some caching, but keep kwargs in mind. Maybe add a key= for purpose/use-place? -def get_pykube_api(timeout=None) -> pykube.HTTPClient: +def get_pykube_api( + timeout: Optional[float] = None, +) -> pykube.HTTPClient: kwargs = dict(timeout=timeout) return pykube.HTTPClient(get_pykube_cfg(), **kwargs) diff --git a/kopf/clients/classes.py b/kopf/clients/classes.py index b4e5c54a..8ee4275a 100644 --- a/kopf/clients/classes.py +++ b/kopf/clients/classes.py @@ -3,9 +3,13 @@ import pykube from kopf.clients import auth +from kopf.structs import resources -def _make_cls(resource) -> Type[pykube.objects.APIObject]: +def _make_cls( + resource: resources.Resource, +) -> Type[pykube.objects.APIObject]: + api = auth.get_pykube_api() api_resources = api.resource_list(resource.api_version)['resources'] resource_kind = next((r['kind'] for r in api_resources if r['name'] == resource.plural), None) diff --git a/kopf/clients/events.py b/kopf/clients/events.py index 85348d2e..82e7d587 100644 --- a/kopf/clients/events.py +++ b/kopf/clients/events.py @@ -1,4 +1,5 @@ import asyncio +import copy import datetime import logging @@ -15,7 +16,13 @@ CUT_MESSAGE_INFIX = '...' -async def post_event(*, ref, type, reason, message=''): +async def post_event( + *, + ref: bodies.ObjectReference, + type: str, + reason: str, + message: str = '', +) -> None: """ Issue an event for the object. @@ -27,9 +34,9 @@ async def post_event(*, ref, type, reason, message=''): # See #164. For cluster-scoped objects, use the current namespace from the current context. # It could be "default", but in some systems, we are limited to one specific namespace only. - namespace = ref.get('namespace') or auth.get_pykube_cfg().namespace - if not ref.get('namespace'): - ref = dict(ref, namespace=namespace) + namespace: str = ref.get('namespace') or auth.get_pykube_cfg().namespace + full_ref: bodies.ObjectReference = copy.copy(ref) + full_ref['namespace'] = namespace # Prevent a common case of event posting errors but shortening the message. if len(message) > MAX_MESSAGE_LENGTH: @@ -53,7 +60,7 @@ async def post_event(*, ref, type, reason, message=''): 'reportingInstance': 'dev', 'source' : {'component': 'kopf'}, # used in the "From" column in `kubectl describe`. - 'involvedObject': ref, + 'involvedObject': full_ref, 'firstTimestamp': now.isoformat() + 'Z', # '2019-01-28T18:25:03.000000Z' -- seen in `kubectl describe ...` 'lastTimestamp': now.isoformat() + 'Z', # '2019-01-28T18:25:03.000000Z' - seen in `kubectl get events` diff --git a/kopf/clients/fetching.py b/kopf/clients/fetching.py index 3f5e0e6e..d6211244 100644 --- a/kopf/clients/fetching.py +++ b/kopf/clients/fetching.py @@ -6,6 +6,8 @@ from kopf.clients import auth from kopf.clients import classes +from kopf.structs import bodies +from kopf.structs import resources _T = TypeVar('_T') @@ -14,12 +16,16 @@ class _UNSET(enum.Enum): token = enum.auto() -def read_crd(*, resource, default=_UNSET.token): +def read_crd( + *, + resource: resources.Resource, + default: Union[_T, _UNSET] = _UNSET.token, +) -> Union[bodies.Body, _T]: try: api = auth.get_pykube_api() cls = pykube.CustomResourceDefinition obj = cls.objects(api, namespace=None).get_by_name(name=resource.name) - return obj.obj + return cast(bodies.Body, obj.obj) except pykube.ObjectDoesNotExist: if not isinstance(default, _UNSET): @@ -31,13 +37,19 @@ def read_crd(*, resource, default=_UNSET.token): raise -def read_obj(*, resource, namespace=None, name=None, default=_UNSET.token): +def read_obj( + *, + resource: resources.Resource, + namespace: Optional[str] = None, + name: Optional[str] = None, + default: Union[_T, _UNSET] = _UNSET.token, +) -> Union[bodies.Body, _T]: try: api = auth.get_pykube_api() cls = classes._make_cls(resource=resource) namespace = namespace if issubclass(cls, pykube.objects.NamespacedAPIObject) else None obj = cls.objects(api, namespace=namespace).get_by_name(name=name) - return obj.obj + return cast(bodies.Body, obj.obj) except pykube.ObjectDoesNotExist: if not isinstance(default, _UNSET): return default @@ -48,7 +60,11 @@ def read_obj(*, resource, namespace=None, name=None, default=_UNSET.token): raise -def list_objs_rv(*, resource, namespace=None): +def list_objs_rv( + *, + resource: resources.Resource, + namespace: Optional[str] = None, +) -> Tuple[Collection[bodies.Body], str]: """ List the objects of specific resource type. @@ -67,7 +83,7 @@ def list_objs_rv(*, resource, namespace=None): lst = cls.objects(api, namespace=pykube.all if namespace is None else namespace) rsp = lst.response - items = [] + items: List[bodies.Body] = [] resource_version = rsp.get('metadata', {}).get('resourceVersion', None) for item in rsp['items']: # FIXME: fix in pykube to inject the missing item's fields from the list's metainfo. diff --git a/kopf/clients/patching.py b/kopf/clients/patching.py index cd7d1a22..808431f5 100644 --- a/kopf/clients/patching.py +++ b/kopf/clients/patching.py @@ -1,4 +1,5 @@ import asyncio +from typing import Optional, cast import pykube import requests @@ -6,9 +7,19 @@ from kopf import config from kopf.clients import auth from kopf.clients import classes - - -async def patch_obj(*, resource, patch, namespace=None, name=None, body=None): +from kopf.structs import bodies +from kopf.structs import patches +from kopf.structs import resources + + +async def patch_obj( + *, + resource: resources.Resource, + patch: patches.Patch, + namespace: Optional[str] = None, + name: Optional[str] = None, + body: Optional[bodies.Body] = None, +) -> None: """ Patch a resource of specific kind. @@ -26,9 +37,9 @@ async def patch_obj(*, resource, patch, namespace=None, name=None, body=None): namespace = body.get('metadata', {}).get('namespace') if body is not None else namespace name = body.get('metadata', {}).get('name') if body is not None else name if body is None: - nskw = {} if namespace is None else dict(namespace=namespace) - body = {'metadata': {'name': name}} - body['metadata'].update(nskw) + body = cast(bodies.Body, {'metadata': {'name': name}}) + if namespace is not None: + body['metadata']['namespace'] = namespace api = auth.get_pykube_api() cls = classes._make_cls(resource=resource) diff --git a/kopf/clients/watching.py b/kopf/clients/watching.py index 72a13b8e..558b1cb4 100644 --- a/kopf/clients/watching.py +++ b/kopf/clients/watching.py @@ -19,8 +19,9 @@ """ import asyncio +import concurrent.futures import logging -from typing import Union +from typing import Optional, Iterator, AsyncIterator, cast import pykube @@ -28,6 +29,7 @@ from kopf.clients import auth from kopf.clients import classes from kopf.clients import fetching +from kopf.structs import bodies from kopf.structs import resources logger = logging.getLogger(__name__) @@ -46,32 +48,38 @@ class StopStreaming(RuntimeError): """ -def streaming_next(src): +def streaming_next(__src: Iterator[bodies.RawEvent]) -> bodies.RawEvent: """ Same as `next`, but replaces the `StopIteration` with `StopStreaming`. """ try: - return next(src) + return next(__src) except StopIteration as e: raise StopStreaming(str(e)) -async def streaming_aiter(src, loop=None, executor=None): +async def streaming_aiter( + __src: Iterator[bodies.RawEvent], + *, + loop: Optional[asyncio.AbstractEventLoop] = None, + executor: Optional[concurrent.futures.Executor] = None, +) -> AsyncIterator[bodies.RawEvent]: """ Same as `iter`, but asynchronous and stops on `StopStreaming`, not on `StopIteration`. """ loop = loop if loop is not None else asyncio.get_event_loop() while True: try: - yield await loop.run_in_executor(executor, streaming_next, src) + yield await loop.run_in_executor(executor, streaming_next, __src) except StopStreaming: return async def infinite_watch( + *, resource: resources.Resource, - namespace: Union[None, str], -): + namespace: Optional[str], +) -> AsyncIterator[bodies.Event]: """ Stream the watch-events infinitely. @@ -89,9 +97,10 @@ async def infinite_watch( async def streaming_watch( + *, resource: resources.Resource, - namespace: Union[None, str], -): + namespace: Optional[str], +) -> AsyncIterator[bodies.Event]: """ Stream the watch-events from one single API watch-call. """ @@ -112,7 +121,7 @@ async def streaming_watch( # "410 Gone" is for the "resource version too old" error, we must restart watching. # The resource versions are lost by k8s after few minutes (as per the official doc). # The error occurs when there is nothing happening for few minutes. This is normal. - if event['type'] == 'ERROR' and event['object']['code'] == 410: + if event['type'] == 'ERROR' and cast(bodies.Error, event['object'])['code'] == 410: logger.debug("Restarting the watch-stream for %r", resource) break # out of for-cycle, to the while-true-cycle. @@ -125,11 +134,17 @@ async def streaming_watch( logger.warning("Ignoring an unsupported event type: %r", event) continue - # Yield normal events to the consumer. - yield event + # Yield normal events to the consumer. Errors are already filtered out. + yield cast(bodies.Event, event) -def watch_objs(*, resource, namespace=None, timeout=None, since=None): +def watch_objs( + *, + resource: resources.Resource, + namespace: Optional[str] = None, + timeout: Optional[float] = None, + since: Optional[str] = None, +) -> Iterator[bodies.RawEvent]: """ Watch objects of a specific resource type. @@ -152,4 +167,7 @@ def watch_objs(*, resource, namespace=None, timeout=None, since=None): namespace = namespace if issubclass(cls, pykube.objects.NamespacedAPIObject) else None lst = cls.objects(api, namespace=pykube.all if namespace is None else namespace) src = lst.watch(since=since, params=params) - return iter({'type': event.type, 'object': event.object.obj} for event in src) + return iter(cast(bodies.RawEvent, { + 'type': event.type, + 'object': event.object.obj, + }) for event in src) diff --git a/kopf/config.py b/kopf/config.py index b4818ed6..c6ec29ef 100644 --- a/kopf/config.py +++ b/kopf/config.py @@ -15,7 +15,11 @@ LOGLEVEL_CRITICAL = logging.CRITICAL -def configure(debug=None, verbose=None, quiet=None): +def configure( + debug: Optional[bool] = None, + verbose: Optional[bool] = None, + quiet: Optional[bool] = None, +) -> None: log_level = 'DEBUG' if debug or verbose else 'WARNING' if quiet else 'INFO' logger = logging.getLogger() @@ -44,12 +48,12 @@ def configure(debug=None, verbose=None, quiet=None): del logger.handlers[1:] # everything except the default NullHandler # Prevent the low-level logging unless in the debug verbosity mode. Keep only the operator's messages. - logging.getLogger('urllib3').propagate = debug - logging.getLogger('asyncio').propagate = debug - logging.getLogger('kubernetes').propagate = debug + logging.getLogger('urllib3').propagate = bool(debug) + logging.getLogger('asyncio').propagate = bool(debug) + logging.getLogger('kubernetes').propagate = bool(debug) loop = asyncio.get_event_loop() - loop.set_debug(debug) + loop.set_debug(bool(debug)) class EventsConfig: @@ -57,7 +61,7 @@ class EventsConfig: Used to configure events sending behaviour. """ - events_loglevel = LOGLEVEL_INFO + events_loglevel: int = logging.INFO """ What events should be logged. """ @@ -92,7 +96,7 @@ def get_syn_executor() -> concurrent.futures.ThreadPoolExecutor: return WorkersConfig.threadpool_executor @staticmethod - def set_synchronous_tasks_threadpool_limit(new_limit: int): + def set_synchronous_tasks_threadpool_limit(new_limit: int) -> None: """ Call this static method at any time to change synchronous_tasks_threadpool_limit in runtime. """ @@ -101,7 +105,7 @@ def set_synchronous_tasks_threadpool_limit(new_limit: int): WorkersConfig.synchronous_tasks_threadpool_limit = new_limit if WorkersConfig.threadpool_executor: - WorkersConfig.threadpool_executor._max_workers = new_limit + WorkersConfig.threadpool_executor._max_workers = new_limit # type: ignore class WatchersConfig: @@ -109,8 +113,8 @@ class WatchersConfig: Used to configure the K8s API watchers and streams. """ - default_stream_timeout = None + default_stream_timeout: Optional[float] = None """ The maximum duration of one streaming request. Patched in some tests. """ - watcher_retry_delay = 0.1 + watcher_retry_delay: float = 0.1 """ How long should a pause be between watch requests (to prevent flooding). """ diff --git a/kopf/engines/logging.py b/kopf/engines/logging.py index 42cae943..eff39d5d 100644 --- a/kopf/engines/logging.py +++ b/kopf/engines/logging.py @@ -10,17 +10,19 @@ """ import copy import logging +from typing import Tuple, MutableMapping, Any from kopf import config from kopf.engines import posting +from kopf.structs import bodies class ObjectPrefixingFormatter(logging.Formatter): """ An utility to prefix the per-object log messages. """ - def format(self, record): + def format(self, record: logging.LogRecord) -> str: if hasattr(record, 'k8s_ref'): - ref = record.k8s_ref + ref = getattr(record, 'k8s_ref') prefix = f"[{ref.get('namespace', '')}/{ref.get('name', '')}]" record = copy.copy(record) # shallow record.msg = f"{prefix} {record.msg}" @@ -32,22 +34,23 @@ class K8sPoster(logging.Handler): A handler to post all log messages as K8s events. """ - def createLock(self): + def createLock(self) -> None: # Save some time on unneeded locks. Events are posted in the background. # We only put events to the queue, which is already lock-protected. self.lock = None - def filter(self, record): + def filter(self, record: logging.LogRecord) -> bool: # Only those which have a k8s object referred (see: `ObjectLogger`). # Otherwise, we have nothing to post, and nothing to do. level_ok = record.levelno >= config.EventsConfig.events_loglevel has_ref = hasattr(record, 'k8s_ref') - skipped = hasattr(record, 'k8s_skip') and record.k8s_skip + skipped = hasattr(record, 'k8s_skip') and getattr(record, 'k8s_skip') return level_ok and has_ref and not skipped and super().filter(record) - def emit(self, record): + def emit(self, record: logging.LogRecord) -> None: # Same try-except as in e.g. `logging.StreamHandler`. try: + ref = getattr(record, 'k8s_ref') type = ( "Debug" if record.levelno <= logging.DEBUG else "Normal" if record.levelno <= logging.INFO else @@ -58,7 +61,7 @@ def emit(self, record): reason = 'Logging' message = self.format(record) posting.enqueue( - ref=record.k8s_ref, + ref=ref, type=type, reason=reason, message=message) @@ -82,7 +85,7 @@ class ObjectLogger(logging.LoggerAdapter): (e.g. in case of background posting via the queue; see `K8sPoster`). """ - def __init__(self, *, body): + def __init__(self, *, body: bodies.Body): super().__init__(logger, dict( k8s_skip=False, k8s_ref=dict( @@ -94,13 +97,22 @@ def __init__(self, *, body): ), )) - def process(self, msg, kwargs): + def process(self, + msg: str, + kwargs: MutableMapping[str, Any], + ) -> Tuple[str, MutableMapping[str, Any]]: # Native logging overwrites the message's extra with the adapter's extra. # We merge them, so that both message's & adapter's extras are available. kwargs["extra"] = dict(self.extra, **kwargs.get('extra', {})) return msg, kwargs - def log(self, level, msg, *args, local=False, **kwargs): + def log(self, + level: int, + msg: str, + *args: Any, + local: bool = False, + **kwargs: Any, + ) -> None: if local: kwargs['extra'] = dict(kwargs.pop('extra', {}), k8s_skip=True) super().log(level, msg, *args, **kwargs) diff --git a/kopf/engines/peering.py b/kopf/engines/peering.py index e26a146e..91020bd4 100644 --- a/kopf/engines/peering.py +++ b/kopf/engines/peering.py @@ -36,12 +36,14 @@ import os import random import socket -from typing import Iterable, Mapping, Optional, Union +from typing import Any, Dict, Iterable, Optional, Union, NoReturn import iso8601 from kopf.clients import fetching from kopf.clients import patching +from kopf.structs import bodies +from kopf.structs import patches from kopf.structs import resources logger = logging.getLogger(__name__) @@ -58,14 +60,16 @@ class Peer: def __init__(self, - id: str, *, + id: str, + *, name: str, priority: int = 0, lastseen: Optional[str] = None, lifetime: int = 60, namespace: Optional[str] = None, legacy: bool = False, - **kwargs): # for the forward-compatibility with the new fields + **_: Any, # for the forward-compatibility with the new fields + ): super().__init__() self.id = id self.name = name @@ -81,11 +85,11 @@ def __init__(self, self.is_dead = self.deadline <= datetime.datetime.utcnow() self.legacy = legacy - def __repr__(self): + def __repr__(self) -> str: return f"{self.__class__.__name__}({self.id}, namespace={self.namespace}, priority={self.priority}, lastseen={self.lastseen}, lifetime={self.lifetime})" @property - def resource(self): + def resource(self) -> resources.Resource: return LEGACY_PEERING_RESOURCE if self.legacy else CLUSTER_PEERING_RESOURCE if self.namespace is None else NAMESPACED_PEERING_RESOURCE @classmethod @@ -93,7 +97,8 @@ def detect(cls, standalone: bool, namespace: Optional[str], name: Optional[str], - **kwargs) -> Optional: + **kwargs: Any, + ) -> Optional["Peer"]: if standalone: return None @@ -114,7 +119,7 @@ def detect(cls, logger.warning(f"Default peering object not found, falling back to the standalone mode.") return None - def as_dict(self): + def as_dict(self) -> Dict[str, Any]: # Only the non-calculated and non-identifying fields. return { 'namespace': self.namespace, @@ -123,7 +128,7 @@ def as_dict(self): 'lifetime': self.lifetime.total_seconds(), } - def touch(self, *, lifetime: Optional[int] = None): + def touch(self, *, lifetime: Optional[int] = None) -> None: self.lastseen = datetime.datetime.utcnow() self.lifetime = (self.lifetime if lifetime is None else lifetime if isinstance(lifetime, datetime.timedelta) else @@ -131,14 +136,14 @@ def touch(self, *, lifetime: Optional[int] = None): self.deadline = self.lastseen + self.lifetime self.is_dead = self.deadline <= datetime.datetime.utcnow() - async def keepalive(self): + async def keepalive(self) -> None: """ Add a peer to the peers, and update its alive status. """ self.touch() await apply_peers([self], name=self.name, namespace=self.namespace, legacy=self.legacy) - async def disappear(self): + async def disappear(self) -> None: """ Remove a peer from the peers (gracefully). """ @@ -146,13 +151,13 @@ async def disappear(self): await apply_peers([self], name=self.name, namespace=self.namespace, legacy=self.legacy) @staticmethod - def _is_peering_exist(name: str, namespace: Optional[str]): + def _is_peering_exist(name: str, namespace: Optional[str]) -> bool: resource = CLUSTER_PEERING_RESOURCE if namespace is None else NAMESPACED_PEERING_RESOURCE obj = fetching.read_obj(resource=resource, namespace=namespace, name=name, default=None) return obj is not None @staticmethod - def _is_peering_legacy(name: str, namespace: Optional[str]): + def _is_peering_legacy(name: str, namespace: Optional[str]) -> bool: """ Legacy mode for the peering: cluster-scoped KopfPeering (new mode: namespaced). @@ -177,14 +182,15 @@ async def apply_peers( name: str, namespace: Union[None, str], legacy: bool = False, -): +) -> None: """ Apply the changes in the peers to the sync-object. The dead peers are removed, the new or alive peers are stored. Note: this does NOT change their `lastseen` field, so do it explicitly with ``touch()``. """ - patch = {'status': {peer.id: None if peer.is_dead else peer.as_dict() for peer in peers}} + patch = patches.Patch() + patch.update({'status': {peer.id: None if peer.is_dead else peer.as_dict() for peer in peers}}) resource = (LEGACY_PEERING_RESOURCE if legacy else CLUSTER_PEERING_RESOURCE if namespace is None else NAMESPACED_PEERING_RESOURCE) @@ -193,12 +199,12 @@ async def apply_peers( async def peers_handler( *, - event: Mapping, + event: bodies.Event, freeze: asyncio.Event, ourselves: Peer, autoclean: bool = True, replenished: asyncio.Event, -): +) -> None: """ Handle a single update of the peers by us or by other operators. @@ -214,7 +220,7 @@ async def peers_handler( body = event['object'] name = body.get('metadata', {}).get('name', None) namespace = body.get('metadata', {}).get('namespace', None) - if namespace != ourselves.namespace or name != ourselves.name: + if namespace != ourselves.namespace or name != ourselves.name or name is None: return # Find if we are still the highest priority operator. @@ -245,7 +251,7 @@ async def peers_handler( async def peers_keepalive( *, ourselves: Peer, -): +) -> NoReturn: """ An ever-running coroutine to regularly send our own keep-alive status for the peers. """ diff --git a/kopf/engines/posting.py b/kopf/engines/posting.py index 122e2319..11c6dbe7 100644 --- a/kopf/engines/posting.py +++ b/kopf/engines/posting.py @@ -17,18 +17,23 @@ import asyncio import sys from contextvars import ContextVar -from typing import Mapping, Text, NamedTuple +from typing import NamedTuple, NoReturn, Optional, Union, Iterator, Iterable, cast, TYPE_CHECKING from kopf import config from kopf.clients import events from kopf.structs import bodies from kopf.structs import dicts +if TYPE_CHECKING: + K8sEventQueue = asyncio.Queue["K8sEvent"] +else: + K8sEventQueue = asyncio.Queue + # Logging and event-posting can happen cross-thread: e.g. in sync-executors. # We have to remember our main event-loop with the queue consumer, to make # thread-safe coro calls both from inside that event-loop and from outside. event_queue_loop_var: ContextVar[asyncio.AbstractEventLoop] = ContextVar('event_queue_loop_var') -event_queue_var: ContextVar[asyncio.Queue] = ContextVar('event_queue_var') +event_queue_var: ContextVar[K8sEventQueue] = ContextVar('event_queue_var') class K8sEvent(NamedTuple): @@ -36,13 +41,18 @@ class K8sEvent(NamedTuple): A single k8s-event to be posted, with all ref-information preserved. It can exist and be posted even after the object is garbage-collected. """ - ref: Mapping - type: Text - reason: Text - message: Text - - -def enqueue(ref, type, reason, message): + ref: bodies.ObjectReference + type: str + reason: str + message: str + + +def enqueue( + ref: bodies.ObjectReference, + type: str, + reason: str, + message: str, +) -> None: loop = event_queue_loop_var.get() queue = event_queue_var.get() event = K8sEvent(ref=ref, type=type, reason=reason, message=message) @@ -50,6 +60,7 @@ def enqueue(ref, type, reason, message): # Events can be posted from another thread than the event-loop's thread # (e.g. from sync-handlers, or from explicitly started per-object threads), # or from the same thread (async-handlers and the framework itself). + running_loop: Optional[asyncio.AbstractEventLoop] try: running_loop = asyncio.get_running_loop() except RuntimeError: @@ -67,23 +78,45 @@ def enqueue(ref, type, reason, message): future.result() # block, wait, re-raise. -def event(objs, *, type, reason, message=''): - for obj in dicts.walk(objs): +def event( + objs: Union[bodies.Body, Iterable[bodies.Body]], + *, + type: str, + reason: str, + message: str = '', +) -> None: + for obj in cast(Iterator[bodies.Body], dicts.walk(objs)): ref = bodies.build_object_reference(obj) enqueue(ref=ref, type=type, reason=reason, message=message) -def info(obj, *, reason, message=''): +def info( + obj: bodies.Body, + *, + reason: str, + message: str = '', +) -> None: if config.EventsConfig.events_loglevel <= config.LOGLEVEL_INFO: event(obj, type='Normal', reason=reason, message=message) -def warn(obj, *, reason, message=''): +def warn( + obj: bodies.Body, + *, + reason: str, + message: str = '', +) -> None: if config.EventsConfig.events_loglevel <= config.LOGLEVEL_WARNING: event(obj, type='Warning', reason=reason, message=message) -def exception(obj, *, reason='', message='', exc=None): +def exception( + obj: bodies.Body, + *, + reason: str = '', + message: str = '', + exc: Optional[BaseException] = None, +) -> None: if exc is None: _, exc, _ = sys.exc_info() reason = reason if reason else type(exc).__name__ @@ -93,8 +126,8 @@ def exception(obj, *, reason='', message='', exc=None): async def poster( - event_queue: asyncio.Queue, -): + event_queue: K8sEventQueue, +) -> NoReturn: """ Post events in the background as they are queued. diff --git a/kopf/on.py b/kopf/on.py index 164b8ab3..15fd6a5d 100644 --- a/kopf/on.py +++ b/kopf/on.py @@ -12,11 +12,14 @@ def creation_handler(**kwargs): # TODO: add cluster=True support (different API methods) -from typing import Optional, Union, Tuple, List, Mapping +from typing import Optional, Callable, Union, Tuple, List from kopf.reactor import causation from kopf.reactor import handling from kopf.reactor import registries +from kopf.structs import bodies + +HandlerDecorator = Callable[[registries.HandlerFn], registries.HandlerFn] def resume( @@ -25,12 +28,13 @@ def resume( id: Optional[str] = None, timeout: Optional[float] = None, registry: Optional[registries.GlobalRegistry] = None, - labels: Optional[Mapping] = None, - annotations: Optional[Mapping] = None): + labels: Optional[bodies.Labels] = None, + annotations: Optional[bodies.Annotations] = None, +) -> HandlerDecorator: """ ``@kopf.on.resume()`` handler for the object resuming on operator (re)start. """ - registry = registry if registry is not None else registries.get_default_registry() - def decorator(fn): - registry.register_cause_handler( + actual_registry = registry if registry is not None else registries.get_default_registry() + def decorator(fn: registries.HandlerFn) -> registries.HandlerFn: + actual_registry.register_cause_handler( group=group, version=version, plural=plural, event=None, initial=True, id=id, timeout=timeout, fn=fn, labels=labels, annotations=annotations) @@ -44,12 +48,13 @@ def create( id: Optional[str] = None, timeout: Optional[float] = None, registry: Optional[registries.GlobalRegistry] = None, - labels: Optional[Mapping] = None, - annotations: Optional[Mapping] = None): + labels: Optional[bodies.Labels] = None, + annotations: Optional[bodies.Annotations] = None, +) -> HandlerDecorator: """ ``@kopf.on.create()`` handler for the object creation. """ - registry = registry if registry is not None else registries.get_default_registry() - def decorator(fn): - registry.register_cause_handler( + actual_registry = registry if registry is not None else registries.get_default_registry() + def decorator(fn: registries.HandlerFn) -> registries.HandlerFn: + actual_registry.register_cause_handler( group=group, version=version, plural=plural, event=causation.CREATE, id=id, timeout=timeout, fn=fn, labels=labels, annotations=annotations) @@ -63,12 +68,13 @@ def update( id: Optional[str] = None, timeout: Optional[float] = None, registry: Optional[registries.GlobalRegistry] = None, - labels: Optional[Mapping] = None, - annotations: Optional[Mapping] = None): + labels: Optional[bodies.Labels] = None, + annotations: Optional[bodies.Annotations] = None, +) -> HandlerDecorator: """ ``@kopf.on.update()`` handler for the object update or change. """ - registry = registry if registry is not None else registries.get_default_registry() - def decorator(fn): - registry.register_cause_handler( + actual_registry = registry if registry is not None else registries.get_default_registry() + def decorator(fn: registries.HandlerFn) -> registries.HandlerFn: + actual_registry.register_cause_handler( group=group, version=version, plural=plural, event=causation.UPDATE, id=id, timeout=timeout, fn=fn, labels=labels, annotations=annotations) @@ -83,12 +89,13 @@ def delete( timeout: Optional[float] = None, registry: Optional[registries.GlobalRegistry] = None, optional: Optional[bool] = None, - labels: Optional[Mapping] = None, - annotations: Optional[Mapping] = None): + labels: Optional[bodies.Labels] = None, + annotations: Optional[bodies.Annotations] = None, +) -> HandlerDecorator: """ ``@kopf.on.delete()`` handler for the object deletion. """ - registry = registry if registry is not None else registries.get_default_registry() - def decorator(fn): - registry.register_cause_handler( + actual_registry = registry if registry is not None else registries.get_default_registry() + def decorator(fn: registries.HandlerFn) -> registries.HandlerFn: + actual_registry.register_cause_handler( group=group, version=version, plural=plural, event=causation.DELETE, id=id, timeout=timeout, fn=fn, requires_finalizer=bool(not optional), @@ -104,12 +111,13 @@ def field( id: Optional[str] = None, timeout: Optional[float] = None, registry: Optional[registries.GlobalRegistry] = None, - labels: Optional[Mapping] = None, - annotations: Optional[Mapping] = None): + labels: Optional[bodies.Labels] = None, + annotations: Optional[bodies.Annotations] = None, +) -> HandlerDecorator: """ ``@kopf.on.field()`` handler for the individual field changes. """ - registry = registry if registry is not None else registries.get_default_registry() - def decorator(fn): - registry.register_cause_handler( + actual_registry = registry if registry is not None else registries.get_default_registry() + def decorator(fn: registries.HandlerFn) -> registries.HandlerFn: + actual_registry.register_cause_handler( group=group, version=version, plural=plural, event=None, field=field, id=id, timeout=timeout, fn=fn, labels=labels, annotations=annotations) @@ -122,12 +130,13 @@ def event( *, id: Optional[str] = None, registry: Optional[registries.GlobalRegistry] = None, - labels: Optional[Mapping] = None, - annotations: Optional[Mapping] = None): + labels: Optional[bodies.Labels] = None, + annotations: Optional[bodies.Annotations] = None, +) -> HandlerDecorator: """ ``@kopf.on.event()`` handler for the silent spies on the events. """ - registry = registry if registry is not None else registries.get_default_registry() - def decorator(fn): - registry.register_event_handler( + actual_registry = registry if registry is not None else registries.get_default_registry() + def decorator(fn: registries.HandlerFn) -> registries.HandlerFn: + actual_registry.register_event_handler( group=group, version=version, plural=plural, id=id, fn=fn, labels=labels, annotations=annotations) return fn @@ -140,7 +149,8 @@ def this( *, id: Optional[str] = None, timeout: Optional[float] = None, - registry: Optional[registries.SimpleRegistry] = None): + registry: Optional[registries.SimpleRegistry] = None, +) -> HandlerDecorator: """ ``@kopf.on.this()`` decorator for the dynamically generated sub-handlers. @@ -170,20 +180,20 @@ def create_task(*, spec, task=task, **kwargs): Note: ``task=task`` is needed to freeze the closure variable, so that every create function will have its own value, not the latest in the for-cycle. """ - registry = registry if registry is not None else handling.subregistry_var.get() - def decorator(fn): - registry.register(id=id, fn=fn, timeout=timeout) + actual_registry = registry if registry is not None else handling.subregistry_var.get() + def decorator(fn: registries.HandlerFn) -> registries.HandlerFn: + actual_registry.register(id=id, fn=fn, timeout=timeout) return fn return decorator def register( - fn, + fn: registries.HandlerFn, *, id: Optional[str] = None, timeout: Optional[float] = None, registry: Optional[registries.SimpleRegistry] = None, -): +) -> registries.HandlerFn: """ Register a function as a sub-handler of the currently executed handler. diff --git a/kopf/reactor/causation.py b/kopf/reactor/causation.py index 6ff23114..83c444c3 100644 --- a/kopf/reactor/causation.py +++ b/kopf/reactor/causation.py @@ -20,11 +20,13 @@ could execute on the yet-existing object (and its children, if created). """ import logging -from typing import NamedTuple, Text, Mapping, MutableMapping, Optional, Any, Union +from typing import Any, NamedTuple, Text, Optional, Union +from kopf.structs import bodies from kopf.structs import diffs from kopf.structs import finalizers from kopf.structs import lastseen +from kopf.structs import patches from kopf.structs import resources # Constants for event types, to prevent a direct usage of strings, and typos. @@ -65,17 +67,19 @@ class Cause(NamedTuple): resource: resources.Resource event: Text initial: bool - body: MutableMapping - patch: MutableMapping - diff: Optional[diffs.Diff] = None - old: Optional[Any] = None - new: Optional[Any] = None + body: bodies.Body + patch: patches.Patch + diff: diffs.Diff = diffs.EMPTY + old: Optional[bodies.BodyEssence] = None + new: Optional[bodies.BodyEssence] = None def detect_cause( - event: Mapping, + *, + event: bodies.Event, + diff: Optional[diffs.Diff] = None, requires_finalizer: bool = True, - **kwargs + **kwargs: Any, ) -> Cause: """ Detect the cause of the event to be handled. @@ -85,92 +89,60 @@ def detect_cause( which performs the actual handler invocation, logging, patching, and other side-effects. """ - diff = kwargs.get('diff') + + # Put them back to the pass-through kwargs (to avoid code duplication). body = event['object'] initial = event['type'] is None # special value simulated by us in kopf.reactor.watching. + kwargs.update(body=body, initial=initial) + if diff is not None: + kwargs.update(diff=diff) # The object was really deleted from the cluster. But we do not care anymore. if event['type'] == 'DELETED': - return Cause( - event=GONE, - body=body, - initial=initial, - **kwargs) + return Cause(event=GONE, **kwargs) # The finalizer has been just removed. We are fully done. if finalizers.is_deleted(body) and not finalizers.has_finalizers(body): - return Cause( - event=FREE, - body=body, - initial=initial, - **kwargs) + return Cause(event=FREE, **kwargs) if finalizers.is_deleted(body): - return Cause( - event=DELETE, - body=body, - initial=initial, - **kwargs) + return Cause(event=DELETE, **kwargs) # For a fresh new object, first block it from accidental deletions without our permission. # The actual handler will be called on the next call. # Only return this cause if the resource requires finalizers to be added. if requires_finalizer and not finalizers.has_finalizers(body): - return Cause( - event=ACQUIRE, - body=body, - initial=initial, - **kwargs) + return Cause(event=ACQUIRE, **kwargs) # Check whether or not the resource has finalizers, but doesn't require them. If this is # the case, then a resource may not be able to be deleted completely as finalizers may # not be removed by the operator under normal operation. We remove the finalizers first, # and any handler that should be called will be done on the next call. if not requires_finalizer and finalizers.has_finalizers(body): - return Cause( - event=RELEASE, - body=body, - initial=initial, - **kwargs) + return Cause(event=RELEASE, **kwargs) # For an object seen for the first time (i.e. just-created), call the creation handlers, # then mark the state as if it was seen when the creation has finished. - if not lastseen.has_state(body): - return Cause( - event=CREATE, - body=body, - initial=initial, - **kwargs) + if not lastseen.has_essence_stored(body): + return Cause(event=CREATE, **kwargs) # Cases with no state changes are usually ignored (NOOP). But for the "None" events, # as simulated for the initial listing, we call the resuming handlers (e.g. threads/tasks). if not diff and initial: - return Cause( - event=RESUME, - body=body, - initial=initial, - **kwargs) + return Cause(event=RESUME, **kwargs) # The previous step triggers one more patch operation without actual changes. Ignore it. # Either the last-seen state or the status field has changed. if not diff: - return Cause( - event=NOOP, - body=body, - initial=initial, - **kwargs) + return Cause(event=NOOP, **kwargs) # And what is left, is the update operation on one of the useful fields of the existing object. - return Cause( - event=UPDATE, - body=body, - initial=initial, - **kwargs) + return Cause(event=UPDATE, **kwargs) def enrich_cause( cause: Cause, - **kwargs + **kwargs: Any, ) -> Cause: """ Produce a new derived cause with some fields modified (). diff --git a/kopf/reactor/handling.py b/kopf/reactor/handling.py index 346b204d..a8e33b0d 100644 --- a/kopf/reactor/handling.py +++ b/kopf/reactor/handling.py @@ -18,7 +18,7 @@ import collections.abc import datetime from contextvars import ContextVar -from typing import Optional, Callable, Iterable, Collection +from typing import Optional, Iterable, Collection, Any from kopf.clients import patching from kopf.engines import logging as logging_engine @@ -26,12 +26,15 @@ from kopf.engines import sleeping from kopf.reactor import causation from kopf.reactor import invocation +from kopf.reactor import lifecycles from kopf.reactor import registries from kopf.reactor import state +from kopf.structs import bodies from kopf.structs import dicts from kopf.structs import diffs from kopf.structs import finalizers from kopf.structs import lastseen +from kopf.structs import patches from kopf.structs import resources WAITING_KEEPALIVE_INTERVAL = 10 * 60 @@ -47,8 +50,11 @@ class PermanentError(Exception): class TemporaryError(Exception): """ A potentially recoverable error, should be retried. """ - def __init__(self, *args, delay=DEFAULT_RETRY_DELAY, **kwargs): - super().__init__(*args, **kwargs) + def __init__(self, + __msg: Optional[str] = None, + delay: Optional[float] = DEFAULT_RETRY_DELAY, + ): + super().__init__(__msg) self.delay = delay @@ -62,7 +68,7 @@ class HandlerChildrenRetry(TemporaryError): # The task-local context; propagated down the stack instead of multiple kwargs. # Used in `@kopf.on.this` and `kopf.execute()` to add/get the sub-handlers. -sublifecycle_var: ContextVar[Callable] = ContextVar('sublifecycle_var') +sublifecycle_var: ContextVar[lifecycles.LifeCycleFn] = ContextVar('sublifecycle_var') subregistry_var: ContextVar[registries.SimpleRegistry] = ContextVar('subregistry_var') subexecuted_var: ContextVar[bool] = ContextVar('subexecuted_var') handler_var: ContextVar[registries.Handler] = ContextVar('handler_var') @@ -70,13 +76,13 @@ class HandlerChildrenRetry(TemporaryError): async def custom_object_handler( - lifecycle: Callable, + lifecycle: lifecycles.LifeCycleFn, registry: registries.GlobalRegistry, resource: resources.Resource, - event: dict, + event: bodies.Event, freeze: asyncio.Event, replenished: asyncio.Event, - event_queue: asyncio.Queue, + event_queue: posting.K8sEventQueue, ) -> None: """ Handle a single custom object low-level watch-event. @@ -87,9 +93,9 @@ async def custom_object_handler( All the internally provoked changes are intercepted, do not create causes, and therefore do not call the handling logic. """ - body = event['object'] - delay = None - patch = {} + body: bodies.Body = event['object'] + patch: patches.Patch = patches.Patch() + delay: Optional[float] = None # Each object has its own prefixed logger, to distinguish parallel handling. logger = logging_engine.ObjectLogger(body=body) @@ -109,7 +115,7 @@ async def custom_object_handler( # Detect the cause and handle it (or at least log this happened). if registry.has_cause_handlers(resource=resource): extra_fields = registry.get_extra_fields(resource=resource) - old, new, diff = lastseen.get_state_diffs(body=body, extra_fields=extra_fields) + old, new, diff = lastseen.get_essential_diffs(body=body, extra_fields=extra_fields) cause = causation.detect_cause( event=event, resource=resource, @@ -136,7 +142,8 @@ async def custom_object_handler( if unslept is not None: logger.debug(f"Sleeping was interrupted by new changes, {unslept} seconds left.") else: - dummy = {'status': {'kopf': {'dummy': datetime.datetime.utcnow().isoformat()}}} + now = datetime.datetime.utcnow() + dummy = patches.Patch({'status': {'kopf': {'dummy': now.isoformat()}}}) logger.debug("Provoking reaction with: %r", dummy) await patching.patch_obj(resource=resource, patch=dummy, body=body) @@ -145,9 +152,9 @@ async def handle_event( registry: registries.BaseRegistry, resource: resources.Resource, logger: logging_engine.ObjectLogger, - patch: dict, - event: dict, -): + patch: patches.Patch, + event: bodies.Event, +) -> None: """ Handle a received event, log but ignore all errors. @@ -182,10 +189,10 @@ async def handle_event( async def handle_cause( - lifecycle: Callable, + lifecycle: lifecycles.LifeCycleFn, registry: registries.BaseRegistry, cause: causation.Cause, -): +) -> Optional[float]: """ Handle a detected cause, as part of the bigger handler routine. """ @@ -224,7 +231,7 @@ async def handle_cause( # Regular causes also do some implicit post-handling when all handlers are done. if done or skip: extra_fields = registry.get_extra_fields(resource=cause.resource) - lastseen.refresh_state(body=body, patch=patch, extra_fields=extra_fields) + lastseen.refresh_essence(body=body, patch=patch, extra_fields=extra_fields) if done: state.purge_progress(body=body, patch=patch) if cause.event == causation.DELETE: @@ -262,11 +269,11 @@ async def handle_cause( async def execute( *, - fns: Optional[Iterable[Callable]] = None, + fns: Optional[Iterable[invocation.Invokable]] = None, handlers: Optional[Iterable[registries.Handler]] = None, registry: Optional[registries.BaseRegistry] = None, - lifecycle: Callable = None, - cause: causation.Cause = None, + lifecycle: Optional[lifecycles.LifeCycleFn] = None, + cause: Optional[causation.Cause] = None, ) -> None: """ Execute the handlers in an isolated lifecycle. @@ -286,8 +293,12 @@ async def execute( # Restore the current context as set in the handler execution cycle. lifecycle = lifecycle if lifecycle is not None else sublifecycle_var.get() - handler = handler_var.get(None) cause = cause if cause is not None else cause_var.get() + handler: Optional[registries.Handler] + try: + handler = handler_var.get() + except LookupError: + handler = None # Validate the inputs; the function signatures cannot put these kind of restrictions, so we do. if len([v for v in [fns, handlers, registry] if v is not None]) > 1: @@ -334,7 +345,7 @@ async def execute( async def _execute( - lifecycle: Callable, + lifecycle: lifecycles.LifeCycleFn, handlers: Collection[registries.Handler], cause: causation.Cause, retry_on_errors: bool = True, @@ -374,7 +385,7 @@ async def _execute( # Restore the handler's progress status. It can be useful in the handlers. retry = state.get_retry_count(body=cause.body, handler=handler) started = state.get_start_time(body=cause.body, handler=handler, patch=cause.patch) - runtime = datetime.datetime.utcnow() - started + runtime = datetime.datetime.utcnow() - (started if started else datetime.datetime.utcnow()) # The exceptions are handled locally and are not re-raised, to keep the operator running. try: @@ -441,19 +452,21 @@ async def _execute( # Other (non-delayed) handlers will continue as normlally, due to raise few lines above. # Other objects will continue as normally in their own handling asyncio tasks. if handlers_wait: - times = [state.get_awake_time(body=cause.body, handler=handler) for handler in handlers_wait] - until = min(times) # the soonest awake datetime. - delay = (until - datetime.datetime.utcnow()).total_seconds() - delay = max(0, min(WAITING_KEEPALIVE_INTERVAL, delay)) + now = datetime.datetime.utcnow() + limit = now + datetime.timedelta(seconds=WAITING_KEEPALIVE_INTERVAL) + times = [state.get_awake_time(body=cause.body, handler=h) for h in handlers_wait] + until = min([t for t in times if t is not None] + [limit]) # the soonest awake datetime. + delay = max(0, (until - now).total_seconds()) raise HandlerChildrenRetry(delay=delay) async def _call_handler( handler: registries.Handler, - *args, + *args: Any, cause: causation.Cause, - lifecycle: Callable, - **kwargs): + lifecycle: lifecycles.LifeCycleFn, + **kwargs: Any, +) -> Any: """ Invoke one handler only, according to the calling conventions. diff --git a/kopf/reactor/invocation.py b/kopf/reactor/invocation.py index 0f0a9e6a..c59c2f9b 100644 --- a/kopf/reactor/invocation.py +++ b/kopf/reactor/invocation.py @@ -8,16 +8,25 @@ import asyncio import contextvars import functools -from typing import Callable +from typing import Optional, Any, Union from kopf import config +from kopf.reactor import causation +from kopf.reactor import lifecycles +from kopf.reactor import registries +from kopf.structs import bodies from kopf.structs import dicts +Invokable = Union[lifecycles.LifeCycleFn, registries.HandlerFn] + async def invoke( - fn: Callable, - *args, - **kwargs): + fn: Invokable, + *args: Any, + event: Optional[bodies.Event] = None, + cause: Optional[causation.Cause] = None, + **kwargs: Any, +) -> Any: """ Invoke a single function, but safely for the main asyncio process. @@ -33,9 +42,9 @@ async def invoke( """ # Add aliases for the kwargs, directly linked to the body, or to the assumed defaults. - if 'event' in kwargs: - event = kwargs.get('event') + if event is not None: kwargs.update( + event=event, type=event['type'], body=event['object'], spec=dicts.DictView(event['object'], 'spec'), @@ -45,9 +54,9 @@ async def invoke( name=event['object'].get('metadata', {}).get('name'), namespace=event['object'].get('metadata', {}).get('namespace'), ) - if 'cause' in kwargs: - cause = kwargs.get('cause') + if cause is not None: kwargs.update( + cause=cause, event=cause.event, body=cause.body, diff=cause.diff, @@ -64,7 +73,7 @@ async def invoke( ) if is_async_fn(fn): - result = await fn(*args, **kwargs) + result = await fn(*args, **kwargs) # type: ignore else: # Not that we want to use functools, but for executors kwargs, it is officially recommended: @@ -81,12 +90,14 @@ async def invoke( return result -def is_async_fn(fn): +def is_async_fn( + fn: Optional[Invokable], +) -> bool: if fn is None: - return None + return False elif isinstance(fn, functools.partial): return is_async_fn(fn.func) elif hasattr(fn, '__wrapped__'): # @functools.wraps() - return is_async_fn(fn.__wrapped__) + return is_async_fn(fn.__wrapped__) # type: ignore else: return asyncio.iscoroutinefunction(fn) diff --git a/kopf/reactor/lifecycles.py b/kopf/reactor/lifecycles.py index 0002e3d1..7fd79ec5 100644 --- a/kopf/reactor/lifecycles.py +++ b/kopf/reactor/lifecycles.py @@ -11,47 +11,70 @@ import logging import random +from typing import Sequence, Any, Optional +from typing_extensions import Protocol + +from kopf.reactor import registries from kopf.reactor import state +from kopf.structs import bodies logger = logging.getLogger(__name__) +Handlers = Sequence[registries.Handler] + + +class LifeCycleFn(Protocol): + """ + A callback type for handlers selection based on the event/cause. + + It is basically `Invokable` extended with an additional positional parameter + and specific return type. But we cannot express it with `typing`. + For the names and types of kwargs, see `Invokable`. + """ + def __call__(self, + handlers: Handlers, + *, + body: bodies.Body, + **kwargs: Any, + ) -> Handlers: ... + -def all_at_once(handlers, **kwargs): +def all_at_once(handlers: Handlers, **kwargs: Any) -> Handlers: """ Execute all handlers at once, in one event reaction cycle, if possible. """ return handlers -def one_by_one(handlers, **kwargs): +def one_by_one(handlers: Handlers, **kwargs: Any) -> Handlers: """ Execute handlers one at a time, in the order they were registered. """ return handlers[:1] -def randomized(handlers, **kwargs): +def randomized(handlers: Handlers, **kwargs: Any) -> Handlers: """ Execute one handler at a time, in the random order. """ return [random.choice(handlers)] if handlers else [] -def shuffled(handlers, **kwargs): +def shuffled(handlers: Handlers, **kwargs: Any) -> Handlers: """ Execute all handlers at once, but in the random order. """ return random.sample(handlers, k=len(handlers)) if handlers else [] -def asap(handlers, *, body, **kwargs): +def asap(handlers: Handlers, *, body: bodies.Body, **kwargs: Any) -> Handlers: """ Execute one handler at a time, skip on failure, try the next one, retry after the full cycle. """ retryfn = lambda handler: state.get_retry_count(body=body, handler=handler) return sorted(handlers, key=retryfn)[:1] -_default_lifecycle = None +_default_lifecycle: LifeCycleFn = asap -def get_default_lifecycle(): - return _default_lifecycle if _default_lifecycle is not None else asap +def get_default_lifecycle() -> LifeCycleFn: + return _default_lifecycle -def set_default_lifecycle(lifecycle): +def set_default_lifecycle(lifecycle: Optional[LifeCycleFn]) -> None: global _default_lifecycle if _default_lifecycle is not None: logger.warning(f"The default lifecycle is already set to {_default_lifecycle}, overriding it to {lifecycle}.") - _default_lifecycle = lifecycle + _default_lifecycle = lifecycle if lifecycle is not None else asap diff --git a/kopf/reactor/queueing.py b/kopf/reactor/queueing.py index ec9ea052..9d83ec06 100644 --- a/kopf/reactor/queueing.py +++ b/kopf/reactor/queueing.py @@ -27,26 +27,42 @@ import enum import logging import time -from typing import Callable, Tuple, Union, MutableMapping, NewType, NamedTuple +from typing import Tuple, Union, MutableMapping, NewType, NamedTuple, TYPE_CHECKING, cast import aiojobs +from typing_extensions import Protocol from kopf import config from kopf.clients import watching +from kopf.structs import bodies from kopf.structs import resources logger = logging.getLogger(__name__) +class WatcherCallback(Protocol): + async def __call__(self, + *, + event: bodies.Event, + replenished: asyncio.Event, + ) -> None: ... + + # An end-of-stream marker sent from the watcher to the workers. # See: https://www.python.org/dev/peps/pep-0484/#support-for-singleton-types-in-unions class EOS(enum.Enum): token = enum.auto() +if TYPE_CHECKING: + WatchEventQueue = asyncio.Queue[Union[bodies.Event, EOS]] +else: + WatchEventQueue = asyncio.Queue + + class Stream(NamedTuple): """ A single object's stream of watch-events, with some extra helpers. """ - watchevents: asyncio.Queue + watchevents: WatchEventQueue replenished: asyncio.Event # means: "hurry up, there are new events queued again" @@ -59,8 +75,8 @@ class Stream(NamedTuple): async def watcher( namespace: Union[None, str], resource: resources.Resource, - handler: Callable, -): + handler: WatcherCallback, +) -> None: """ The watchers watches for the resource events via the API, and spawns the handlers for every object. @@ -82,7 +98,7 @@ async def watcher( # Either use the existing object's queue, or create a new one together with the per-object job. # "Fire-and-forget": we do not wait for the result; the job destroys itself when it is fully done. async for event in watching.infinite_watch(resource=resource, namespace=namespace): - key = (resource, event['object']['metadata']['uid']) + key = cast(ObjectRef, (resource, event['object']['metadata']['uid'])) try: streams[key].replenished.set() # interrupt current sleeps, if any. await streams[key].watchevents.put(event) @@ -100,10 +116,10 @@ async def watcher( async def worker( - handler: Callable, + handler: WatcherCallback, streams: Streams, key: ObjectRef, -): +) -> None: """ The per-object workers consume the object's events and invoke the handler. @@ -168,7 +184,11 @@ async def worker( pass -async def _wait_for_depletion(*, scheduler: aiojobs.Scheduler, streams: Streams): +async def _wait_for_depletion( + *, + scheduler: aiojobs.Scheduler, + streams: Streams, +) -> None: # Notify all the workers to finish now. Wake them up if they are waiting in the queue-getting. for stream in streams.values(): diff --git a/kopf/reactor/registries.py b/kopf/reactor/registries.py index 743f1fdc..1c080b8c 100644 --- a/kopf/reactor/registries.py +++ b/kopf/reactor/registries.py @@ -13,22 +13,56 @@ """ import abc import functools +import logging from types import FunctionType, MethodType -from typing import MutableMapping, NamedTuple, Text, Optional, Tuple, Callable, Mapping +from typing import (Any, MutableMapping, Optional, Sequence, Collection, Iterable, Iterator, + NamedTuple, Union, Tuple, List, Set, FrozenSet, Mapping, NewType, cast) +from typing_extensions import Protocol + +from kopf.reactor import causation +from kopf.structs import bodies +from kopf.structs import dicts +from kopf.structs import diffs +from kopf.structs import patches from kopf.structs import resources as resources_ +# Strings are taken from the users, but then tainted as this type for stricter type-checking: +# to prevent usage of some other strings (e.g. operator id) as the handlers ids. +HandlerId = NewType('HandlerId', str) + + +class HandlerFn(Protocol): + def __call__(self, + *args: Any, + type: str, + event: Union[str, bodies.Event], # FIXME: or str for cause-handlers. + body: bodies.Body, + meta: bodies.Meta, + spec: bodies.Spec, + status: bodies.Status, + uid: str, + name: str, + namespace: Optional[str], + patch: patches.Patch, + logger: Union[logging.Logger, logging.LoggerAdapter], + diff: diffs.Diff, # TODO:? Optional[diffs.Diff]? + old: bodies.Body, # TODO: or Any for field-handlers. + new: bodies.Body, # TODO: or Any for field-handlers. + **kwargs: Any, + ) -> Any: ... + # A registered handler (function + event meta info). class Handler(NamedTuple): - fn: Callable - id: Text - event: Text + fn: HandlerFn + id: HandlerId + event: Optional[str] field: Optional[Tuple[str, ...]] timeout: Optional[float] = None initial: Optional[bool] = None - labels: Optional[Mapping] = None - annotations: Optional[Mapping] = None + labels: Optional[bodies.Labels] = None + annotations: Optional[bodies.Annotations] = None class BaseRegistry(metaclass=abc.ABCMeta): @@ -36,29 +70,39 @@ class BaseRegistry(metaclass=abc.ABCMeta): A registry stores the handlers and provides them to the reactor. """ - def get_cause_handlers(self, cause): + def get_cause_handlers(self, + cause: causation.Cause, + ) -> Sequence[Handler]: return list(self._deduplicated(self.iter_cause_handlers(cause=cause))) @abc.abstractmethod - def iter_cause_handlers(self, cause): + def iter_cause_handlers(self, + cause: causation.Cause, + ) -> Iterator[Handler]: pass - def get_event_handlers(self, resource, event): + def get_event_handlers(self, + resource: resources_.Resource, + event: bodies.Event, + ) -> Sequence[Handler]: return list(self._deduplicated(self.iter_event_handlers(resource=resource, event=event))) @abc.abstractmethod - def iter_event_handlers(self, resource, event): + def iter_event_handlers(self, + resource: resources_.Resource, + event: bodies.Event, + ) -> Iterator[Handler]: pass - def get_extra_fields(self, resource): + def get_extra_fields(self, resource: resources_.Resource) -> Set[dicts.FieldPath]: return set(self.iter_extra_fields(resource=resource)) @abc.abstractmethod - def iter_extra_fields(self, resource): + def iter_extra_fields(self, resource: resources_.Resource) -> Iterator[dicts.FieldPath]: pass @staticmethod - def _deduplicated(handlers): + def _deduplicated(handlers: Iterable[Handler]) -> Iterator[Handler]: """ Yield the handlers deduplicated. @@ -79,7 +123,7 @@ def fn(**kwargs): pass handled) **AND** it is detected as per-existing before operator start. But `fn()` should be called only once for this cause. """ - seen_ids = set() + seen_ids: Set[int] = set() for handler in handlers: if id(handler.fn) in seen_ids: pass @@ -93,20 +137,29 @@ class SimpleRegistry(BaseRegistry): A simple registry is just a list of handlers, no grouping. """ - def __init__(self, prefix=None): + def __init__(self, prefix: Optional[str] = None) -> None: super().__init__() self.prefix = prefix - self._handlers = [] # [Handler, ...] - self._handlers_requiring_finalizer = [] + self._handlers: List[Handler] = [] # [Handler, ...] + self._handlers_requiring_finalizer: List[Handler] = [] - def __bool__(self): + def __bool__(self) -> bool: return bool(self._handlers) - def append(self, handler): + def append(self, handler: Handler) -> None: self._handlers.append(handler) - def register(self, fn, id=None, event=None, field=None, timeout=None, initial=None, requires_finalizer=False, - labels=None, annotations=None): + def register(self, + fn: HandlerFn, + id: Optional[str] = None, + event: Optional[str] = None, + field: Optional[dicts.FieldSpec] = None, + timeout: Optional[float] = None, + initial: Optional[bool] = None, + requires_finalizer: bool = False, + labels: Optional[bodies.Labels] = None, + annotations: Optional[bodies.Annotations] = None, + ) -> HandlerFn: if field is None: field = None # for the non-field events elif isinstance(field, str): @@ -116,11 +169,12 @@ def register(self, fn, id=None, event=None, field=None, timeout=None, initial=No else: raise ValueError(f"Field must be either a str, or a list/tuple. Got {field!r}") - id = id if id is not None else get_callable_id(fn) - id = id if field is None else f'{id}/{".".join(field)}' - id = id if self.prefix is None else f'{self.prefix}/{id}' - handler = Handler(id=id, fn=fn, event=event, field=field, timeout=timeout, initial=initial, - labels=labels, annotations=annotations) + real_id: HandlerId + real_id = cast(HandlerId, id) if id is not None else cast(HandlerId, get_callable_id(fn)) + real_id = real_id if field is None else cast(HandlerId, f'{real_id}/{".".join(field)}') + real_id = real_id if self.prefix is None else cast(HandlerId, f'{self.prefix}/{real_id}') + handler = Handler(id=real_id, fn=fn, event=event, field=field, timeout=timeout, + initial=initial, labels=labels, annotations=annotations) self.append(handler) @@ -129,8 +183,10 @@ def register(self, fn, id=None, event=None, field=None, timeout=None, initial=No return fn # to be usable as a decorator too. - def iter_cause_handlers(self, cause): - changed_fields = {field for _, field, _, _ in cause.diff or []} + def iter_cause_handlers(self, + cause: causation.Cause, + ) -> Iterator[Handler]: + changed_fields = frozenset(field for _, field, _, _ in cause.diff or []) for handler in self._handlers: if handler.event is None or handler.event == cause.event: if handler.initial and not cause.initial: @@ -138,17 +194,25 @@ def iter_cause_handlers(self, cause): elif match(handler=handler, body=cause.body, changed_fields=changed_fields): yield handler - def iter_event_handlers(self, resource, event): + def iter_event_handlers(self, + resource: resources_.Resource, + event: bodies.Event, + ) -> Iterator[Handler]: for handler in self._handlers: if match(handler=handler, body=event['object']): yield handler - def iter_extra_fields(self, resource): + def iter_extra_fields(self, + resource: resources_.Resource, + ) -> Iterator[dicts.FieldPath]: for handler in self._handlers: if handler.field: yield handler.field - def requires_finalizer(self, resource, body): + def requires_finalizer(self, + resource: resources_.Resource, + body: bodies.Body, + ) -> bool: # check whether the body matches a deletion handler for handler in self._handlers_requiring_finalizer: if match(handler=handler, body=body): @@ -157,14 +221,14 @@ def requires_finalizer(self, resource, body): return False -def get_callable_id(c): +def get_callable_id(c: HandlerFn) -> str: """ Get an reasonably good id of any commonly used callable. """ if c is None: - return None + raise ValueError("Cannot build a persistent id of None.") elif isinstance(c, functools.partial): return get_callable_id(c.func) elif hasattr(c, '__wrapped__'): # @functools.wraps() - return get_callable_id(c.__wrapped__) + return get_callable_id(getattr(c, '__wrapped__')) elif isinstance(c, FunctionType) and c.__name__ == '': # The best we can do to keep the id stable across the process restarts, # assuming at least no code changes. The code changes are not detectable. @@ -172,7 +236,7 @@ def get_callable_id(c): path = c.__code__.co_filename return f'lambda:{path}:{line}' elif isinstance(c, (FunctionType, MethodType)): - return getattr(c, '__qualname__', getattr(c, '__name__', repr(c))) + return str(getattr(c, '__qualname__', getattr(c, '__name__', repr(c)))) else: raise ValueError(f"Cannot get id of {c!r}.") @@ -183,14 +247,25 @@ class GlobalRegistry(BaseRegistry): It is usually populated by the `@kopf.on...` decorators. """ - def __init__(self): + def __init__(self) -> None: super().__init__() self._cause_handlers: MutableMapping[resources_.Resource, SimpleRegistry] = {} self._event_handlers: MutableMapping[resources_.Resource, SimpleRegistry] = {} - def register_cause_handler(self, group, version, plural, fn, - id=None, event=None, field=None, timeout=None, initial=None, requires_finalizer=False, - labels=None, annotations=None): + def register_cause_handler(self, + group: str, + version: str, + plural: str, + fn: HandlerFn, + id: Optional[str] = None, + event: Optional[str] = None, + field: Optional[dicts.FieldSpec] = None, + timeout: Optional[float] = None, + initial: Optional[bool] = None, + requires_finalizer: bool = False, + labels: Optional[bodies.Labels] = None, + annotations: Optional[bodies.Annotations] = None, + ) -> HandlerFn: """ Register an additional handler function for the specific resource and specific event. """ @@ -200,8 +275,15 @@ def register_cause_handler(self, group, version, plural, fn, labels=labels, annotations=annotations) return fn # to be usable as a decorator too. - def register_event_handler(self, group, version, plural, fn, id=None, labels=None, - annotations=None): + def register_event_handler(self, + group: str, + version: str, + plural: str, + fn: HandlerFn, + id: Optional[str] = None, + labels: Optional[bodies.Labels] = None, + annotations: Optional[bodies.Annotations] = None, + ) -> HandlerFn: """ Register an additional handler function for low-level events. """ @@ -211,19 +293,25 @@ def register_event_handler(self, group, version, plural, fn, id=None, labels=Non return fn # to be usable as a decorator too. @property - def resources(self): + def resources(self) -> FrozenSet[resources_.Resource]: """ All known resources in the registry. """ return frozenset(self._cause_handlers) | frozenset(self._event_handlers) - def has_cause_handlers(self, resource): + def has_cause_handlers(self, + resource: resources_.Resource, + ) -> bool: resource_registry = self._cause_handlers.get(resource, None) return bool(resource_registry) - def has_event_handlers(self, resource): + def has_event_handlers(self, + resource: resources_.Resource, + ) -> bool: resource_registry = self._event_handlers.get(resource, None) return bool(resource_registry) - def iter_cause_handlers(self, cause): + def iter_cause_handlers(self, + cause: causation.Cause, + ) -> Iterator[Handler]: """ Iterate all handlers that match this cause/event, in the order they were registered (even if mixed). """ @@ -231,7 +319,10 @@ def iter_cause_handlers(self, cause): if resource_registry is not None: yield from resource_registry.iter_cause_handlers(cause=cause) - def iter_event_handlers(self, resource, event): + def iter_event_handlers(self, + resource: resources_.Resource, + event: bodies.Event, + ) -> Iterator[Handler]: """ Iterate all handlers for the low-level events. """ @@ -239,12 +330,17 @@ def iter_event_handlers(self, resource, event): if resource_registry is not None: yield from resource_registry.iter_event_handlers(resource=resource, event=event) - def iter_extra_fields(self, resource): + def iter_extra_fields(self, + resource: resources_.Resource, + ) -> Iterator[dicts.FieldPath]: resource_registry = self._cause_handlers.get(resource, None) if resource_registry is not None: yield from resource_registry.iter_extra_fields(resource=resource) - def requires_finalizer(self, resource, body): + def requires_finalizer(self, + resource: resources_.Resource, + body: bodies.Body, + ) -> bool: """ Return whether a finalizer should be added to the given resource or not. @@ -255,7 +351,7 @@ def requires_finalizer(self, resource, body): return resource_registry.requires_finalizer(resource, body) -_default_registry = GlobalRegistry() +_default_registry: GlobalRegistry = GlobalRegistry() def get_default_registry() -> GlobalRegistry: @@ -266,7 +362,7 @@ def get_default_registry() -> GlobalRegistry: return _default_registry -def set_default_registry(registry: GlobalRegistry): +def set_default_registry(registry: GlobalRegistry) -> None: """ Set the default registry to be used by the decorators and the reactor unless the explicit registry is provided to them. @@ -275,36 +371,54 @@ def set_default_registry(registry: GlobalRegistry): _default_registry = registry -def match(handler, body, changed_fields=None): - return ( - (not handler.field or _matches_field(handler, changed_fields or [])) and - (not handler.labels or _matches_labels(handler, body)) and - (not handler.annotations or _matches_annotations(handler, body)) - ) - - -def _matches_field(handler, changed_fields): - return any(field[:len(handler.field)] == handler.field for field in changed_fields) - - -def _matches_labels(handler, body): - return _matches_metadata(handler=handler, body=body, metadata_type='labels') - - -def _matches_annotations(handler, body): - return _matches_metadata(handler=handler, body=body, metadata_type='annotations') - - -def _matches_metadata(handler, body, metadata_type): - metadata = getattr(handler, metadata_type) - object_metadata = body.get('metadata', {}).get(metadata_type, {}) - - for key, value in metadata.items(): - if key not in object_metadata: +def match( + handler: Handler, + body: bodies.Body, + changed_fields: Collection[dicts.FieldPath] = frozenset(), +) -> bool: + return all([ + _matches_field(handler, changed_fields or {}), + _matches_labels(handler, body), + _matches_annotations(handler, body), + ]) + + +def _matches_field( + handler: Handler, + changed_fields: Collection[dicts.FieldPath] = frozenset(), +) -> bool: + return (not handler.field or + any(field[:len(handler.field)] == handler.field for field in changed_fields)) + + +def _matches_labels( + handler: Handler, + body: bodies.Body, +) -> bool: + return (not handler.labels or + _matches_metadata(pattern=handler.labels, + content=body.get('metadata', {}).get('labels', {}))) + + +def _matches_annotations( + handler: Handler, + body: bodies.Body, +) -> bool: + return (not handler.annotations or + _matches_metadata(pattern=handler.annotations, + content=body.get('metadata', {}).get('annotations', {}))) + + +def _matches_metadata( + *, + pattern: Mapping[str, str], # from the handler + content: Mapping[str, str], # from the body +) -> bool: + for key, value in pattern.items(): + if key not in content: return False - elif value is not None and value != object_metadata[key]: + elif value is not None and value != content[key]: return False else: continue - return True diff --git a/kopf/reactor/running.py b/kopf/reactor/running.py index afac9feb..9736a4ad 100644 --- a/kopf/reactor/running.py +++ b/kopf/reactor/running.py @@ -5,7 +5,8 @@ import signal import threading import warnings -from typing import Optional, Callable, Collection, Union +from typing import (Optional, Collection, Union, Tuple, Set, Text, Any, Coroutine, + cast, TYPE_CHECKING) from kopf.engines import peering from kopf.engines import posting @@ -14,20 +15,28 @@ from kopf.reactor import queueing from kopf.reactor import registries -Flag = Union[asyncio.Future, asyncio.Event, concurrent.futures.Future, threading.Event] +if TYPE_CHECKING: + asyncio_Task = asyncio.Task[None] + asyncio_Future = asyncio.Future[Any] +else: + asyncio_Task = asyncio.Task + asyncio_Future = asyncio.Future + +Flag = Union[asyncio_Future, asyncio.Event, concurrent.futures.Future, threading.Event] +Tasks = Collection[asyncio_Task] logger = logging.getLogger(__name__) def run( loop: Optional[asyncio.AbstractEventLoop] = None, - lifecycle: Optional[Callable] = None, + lifecycle: Optional[lifecycles.LifeCycleFn] = None, registry: Optional[registries.GlobalRegistry] = None, standalone: bool = False, priority: int = 0, peering_name: Optional[str] = None, namespace: Optional[str] = None, -): +) -> None: """ Run the whole operator synchronously. @@ -48,7 +57,7 @@ def run( async def operator( - lifecycle: Optional[Callable] = None, + lifecycle: Optional[lifecycles.LifeCycleFn] = None, registry: Optional[registries.GlobalRegistry] = None, standalone: bool = False, priority: int = 0, @@ -56,7 +65,7 @@ async def operator( namespace: Optional[str] = None, stop_flag: Optional[Flag] = None, ready_flag: Optional[Flag] = None, -): +) -> None: """ Run the whole operator asynchronously. @@ -80,7 +89,7 @@ async def operator( async def spawn_tasks( - lifecycle: Optional[Callable] = None, + lifecycle: Optional[lifecycles.LifeCycleFn] = None, registry: Optional[registries.GlobalRegistry] = None, standalone: bool = False, priority: int = 0, @@ -88,7 +97,7 @@ async def spawn_tasks( namespace: Optional[str] = None, stop_flag: Optional[Flag] = None, ready_flag: Optional[Flag] = None, -) -> Collection[asyncio.Task]: +) -> Tasks: """ Spawn all the tasks needed to run the operator. @@ -99,9 +108,9 @@ async def spawn_tasks( # The freezer and the registry are scoped to this whole task-set, to sync them all. lifecycle = lifecycle if lifecycle is not None else lifecycles.get_default_lifecycle() registry = registry if registry is not None else registries.get_default_registry() - event_queue = asyncio.Queue(loop=loop) - freeze_flag = asyncio.Event(loop=loop) - signal_flag = asyncio.Future(loop=loop) + event_queue: posting.K8sEventQueue = asyncio.Queue(loop=loop) + freeze_flag: asyncio.Event = asyncio.Event(loop=loop) + signal_flag: asyncio_Future = asyncio.Future(loop=loop) tasks = [] # A top-level task for external stopping by setting a stop-flag. Once set, @@ -162,7 +171,11 @@ async def spawn_tasks( return tasks -async def run_tasks(root_tasks, *, ignored: Collection[asyncio.Task] = frozenset()): +async def run_tasks( + root_tasks: Tasks, + *, + ignored: Tasks = frozenset(), +) -> None: """ Orchestrate the tasks and terminate them gracefully when needed. @@ -215,20 +228,31 @@ async def run_tasks(root_tasks, *, ignored: Collection[asyncio.Task] = frozenset await _reraise(root_done | root_cancelled | hung_done | hung_cancelled) -async def _all_tasks(ignored: Collection[asyncio.Task] = frozenset()) -> Collection[asyncio.Task]: +async def _all_tasks( + ignored: Tasks = frozenset(), +) -> Tasks: current_task = asyncio.current_task() return {task for task in asyncio.all_tasks() if task is not current_task and task not in ignored} -async def _wait(tasks, *, timeout=None, return_when=asyncio.ALL_COMPLETED): +async def _wait( + tasks: Tasks, + *, + timeout: Optional[float] = None, + return_when: Any = asyncio.ALL_COMPLETED, +) -> Tuple[Set[asyncio_Task], Set[asyncio_Task]]: if not tasks: - return set(), () + return set(), set() done, pending = await asyncio.wait(tasks, timeout=timeout, return_when=return_when) - return done, pending + return cast(Set[asyncio_Task], done), cast(Set[asyncio_Task], pending) -async def _stop(tasks, title, cancelled): +async def _stop( + tasks: Tasks, + title: str, + cancelled: bool, +) -> Tuple[Set[asyncio_Task], Set[asyncio_Task]]: if not tasks: logger.debug(f"{title} tasks stopping is skipped: no tasks given.") return set(), set() @@ -254,10 +278,12 @@ async def _stop(tasks, title, cancelled): are = 'are' if not pending else 'are not' why = 'cancelled normally' if cancelled else 'finished normally' logger.debug(f"{title} tasks {are} stopped: {why}; tasks left: {pending!r}") - return done, pending + return cast(Set[asyncio_Task], done), cast(Set[asyncio_Task], pending) -async def _reraise(tasks): +async def _reraise( + tasks: Tasks, +) -> None: for task in tasks: try: task.result() # can raise the regular (non-cancellation) exceptions. @@ -265,7 +291,10 @@ async def _reraise(tasks): pass -async def _root_task_checker(name, coro): +async def _root_task_checker( + name: Text, + coro: Coroutine[Any, Any, Any], +) -> None: try: await coro except asyncio.CancelledError: @@ -279,17 +308,23 @@ async def _root_task_checker(name, coro): async def _stop_flag_checker( - signal_flag: asyncio.Future, + signal_flag: asyncio_Future, ready_flag: Optional[Flag], stop_flag: Optional[Flag], -): +) -> None: # TODO: collect the readiness of all root tasks instead, and set this one only when fully ready. # Notify the caller that we are ready to be executed. await _raise_flag(ready_flag) + # Selects the flags to be awaited (if set). + flags = [] + if signal_flag is not None: + flags.append(signal_flag) + if stop_flag is not None: + flags.append(asyncio.create_task(_wait_flag(stop_flag))) + # Wait until one of the stoppers is set/raised. try: - flags = [signal_flag] + ([] if stop_flag is None else [_wait_flag(stop_flag)]) done, pending = await asyncio.wait(flags, return_when=asyncio.FIRST_COMPLETED) future = done.pop() result = await future @@ -304,7 +339,11 @@ async def _stop_flag_checker( logger.info("Stop-flag is set to %r. Operator is stopping.", result) -def create_tasks(loop: asyncio.AbstractEventLoop, *arg, **kwargs): +def create_tasks( + loop: asyncio.AbstractEventLoop, + *arg: Any, + **kwargs: Any, +) -> Tasks: """ .. deprecated:: 1.0 This is a synchronous interface to `spawn_tasks`. @@ -319,7 +358,7 @@ def create_tasks(loop: asyncio.AbstractEventLoop, *arg, **kwargs): async def _wait_flag( flag: Optional[Flag], -): +) -> Any: """ Wait for a flag to be raised. @@ -344,7 +383,7 @@ async def _wait_flag( async def _raise_flag( flag: Optional[Flag], -): +) -> None: """ Raise a flag. diff --git a/kopf/reactor/state.py b/kopf/reactor/state.py index 305bd061..f7d7af96 100644 --- a/kopf/reactor/state.py +++ b/kopf/reactor/state.py @@ -52,33 +52,59 @@ import collections.abc import copy import datetime +from typing import Any, Optional +from kopf.reactor import registries +from kopf.structs import bodies +from kopf.structs import patches -def is_started(*, body, handler): + +def is_started( + *, + body: bodies.Body, + handler: registries.Handler, +) -> bool: progress = body.get('status', {}).get('kopf', {}).get('progress', {}) return handler.id in progress -def is_sleeping(*, body, handler): +def is_sleeping( + *, + body: bodies.Body, + handler: registries.Handler, +) -> bool: ts = get_awake_time(body=body, handler=handler) finished = is_finished(body=body, handler=handler) return not finished and ts is not None and ts > datetime.datetime.utcnow() -def is_awakened(*, body, handler): +def is_awakened( + *, + body: bodies.Body, + handler: registries.Handler, +) -> bool: finished = is_finished(body=body, handler=handler) sleeping = is_sleeping(body=body, handler=handler) return bool(not finished and not sleeping) -def is_finished(*, body, handler): +def is_finished( + *, + body: bodies.Body, + handler: registries.Handler, +) -> bool: progress = body.get('status', {}).get('kopf', {}).get('progress', {}) success = progress.get(handler.id, {}).get('success', None) failure = progress.get(handler.id, {}).get('failure', None) return bool(success or failure) -def get_start_time(*, body, patch, handler): +def get_start_time( + *, + body: bodies.Body, + patch: patches.Patch, + handler: registries.Handler, +) -> Optional[datetime.datetime]: progress = patch.get('status', {}).get('kopf', {}).get('progress', {}) new_value = progress.get(handler.id, {}).get('started', None) progress = body.get('status', {}).get('kopf', {}).get('progress', {}) @@ -87,37 +113,63 @@ def get_start_time(*, body, patch, handler): return None if value is None else datetime.datetime.fromisoformat(value) -def get_awake_time(*, body, handler): +def get_awake_time( + *, + body: bodies.Body, + handler: registries.Handler, +) -> Optional[datetime.datetime]: progress = body.get('status', {}).get('kopf', {}).get('progress', {}) value = progress.get(handler.id, {}).get('delayed', None) return None if value is None else datetime.datetime.fromisoformat(value) -def get_retry_count(*, body, handler): +def get_retry_count( + *, + body: bodies.Body, + handler: registries.Handler, +) -> int: progress = body.get('status', {}).get('kopf', {}).get('progress', {}) return progress.get(handler.id, {}).get('retries', None) or 0 -def set_start_time(*, body, patch, handler): +def set_start_time( + *, + body: bodies.Body, + patch: patches.Patch, + handler: registries.Handler, +) -> None: progress = patch.setdefault('status', {}).setdefault('kopf', {}).setdefault('progress', {}) progress.setdefault(handler.id, {}).update({ 'started': datetime.datetime.utcnow().isoformat(), }) -def set_awake_time(*, body, patch, handler, delay=None): +def set_awake_time( + *, + body: bodies.Body, + patch: patches.Patch, + handler: registries.Handler, + delay: Optional[float] = None, +) -> None: + ts_str: Optional[str] if delay is not None: ts = datetime.datetime.utcnow() + datetime.timedelta(seconds=delay) - ts = ts.isoformat() + ts_str = ts.isoformat() else: - ts = None + ts_str = None progress = patch.setdefault('status', {}).setdefault('kopf', {}).setdefault('progress', {}) progress.setdefault(handler.id, {}).update({ - 'delayed': ts, + 'delayed': ts_str, }) -def set_retry_time(*, body, patch, handler, delay=None): +def set_retry_time( + *, + body: bodies.Body, + patch: patches.Patch, + handler: registries.Handler, + delay: Optional[float] = None, +) -> None: retry = get_retry_count(body=body, handler=handler) progress = patch.setdefault('status', {}).setdefault('kopf', {}).setdefault('progress', {}) progress.setdefault(handler.id, {}).update({ @@ -126,7 +178,13 @@ def set_retry_time(*, body, patch, handler, delay=None): set_awake_time(body=body, patch=patch, handler=handler, delay=delay) -def store_failure(*, body, patch, handler, exc): +def store_failure( + *, + body: bodies.Body, + patch: patches.Patch, + handler: registries.Handler, + exc: BaseException, +) -> None: retry = get_retry_count(body=body, handler=handler) progress = patch.setdefault('status', {}).setdefault('kopf', {}).setdefault('progress', {}) progress.setdefault(handler.id, {}).update({ @@ -137,7 +195,13 @@ def store_failure(*, body, patch, handler, exc): }) -def store_success(*, body, patch, handler, result=None): +def store_success( + *, + body: bodies.Body, + patch: patches.Patch, + handler: registries.Handler, + result: Any = None, +) -> None: retry = get_retry_count(body=body, handler=handler) progress = patch.setdefault('status', {}).setdefault('kopf', {}).setdefault('progress', {}) progress.setdefault(handler.id, {}).update({ @@ -149,7 +213,12 @@ def store_success(*, body, patch, handler, result=None): store_result(patch=patch, handler=handler, result=result) -def store_result(*, patch, handler, result=None): +def store_result( + *, + patch: patches.Patch, + handler: registries.Handler, + result: Any = None, +) -> None: if result is None: pass elif isinstance(result, collections.abc.Mapping): @@ -160,5 +229,9 @@ def store_result(*, patch, handler, result=None): patch.setdefault('status', {})[handler.id] = copy.deepcopy(result) -def purge_progress(*, body, patch): +def purge_progress( + *, + body: bodies.Body, + patch: patches.Patch, +) -> None: patch.setdefault('status', {}).setdefault('kopf', {})['progress'] = None diff --git a/kopf/structs/bodies.py b/kopf/structs/bodies.py index 1dd7ee02..f807393a 100644 --- a/kopf/structs/bodies.py +++ b/kopf/structs/bodies.py @@ -1,9 +1,156 @@ """ All the structures coming from/to the Kubernetes API. + +The usage of these classes is spread over the codebase, so they are extracted +into a separate module of such type definitions. + +For strict type-checking, they are detailed to the per-field level +(e.g. `TypedDict` instead of just ``Mapping[Any, Any]``) -- +as used by the framework. The operators can use arbitrary fields at runtime, +which are not declared in the type definitions at type-checking time. + +In case the operators are also type-checked, type casting can be used +(without `cast`, this code fails at type-checking, though works at runtime):: + + from typing import cast + import kopf + + class MyMeta(kopf.Meta): + unknownField: str + + @kopf.on.create('zalando.org', 'v1', 'kopfexamples') + def create_fn(*args, meta: kopf.Meta, **kwargs): + meta = cast(MyMeta, meta) + print(meta['unknownField']) + +.. note:: + + There is a strict separation of objects coming from/to the Kubernetes API + and from (but not to) the users: + + The Kubernetes-originated objects are dicts or dict-like custom classes. + The framework internally expect them to be such. Arbitrary 3rd-party + classes are not supported and are not delivered to the handlers. + + The user-originated objects can be either one of the Kubernetes-originated + framework-supported types (dicts/dict-like), or a 3rd-party class, + such as from ``pykube-ng``, ``kubernetes`` client, etc -- as long as it is + supported by the framework's object-processing functions. + + In the future, extra classes can be added for the user-originated objects + and object-processing functions. The internal dicts will remain the same. """ +from typing import Any, Mapping, Union, List, Optional, cast + +from typing_extensions import TypedDict, Literal + +# +# The bodies and body parts, as specially used by the framework. +# All non-used payload falls into `Any`, and is not type-checked. +# + +Labels = Mapping[str, str] +Annotations = Mapping[str, str] +Spec = Mapping[str, Any] +Status = Mapping[str, Any] + + +class Meta(TypedDict, total=False): + uid: str + name: str + namespace: str + labels: Labels + annotations: Annotations + finalizers: List[str] + resourceVersion: str + deletionTimestamp: str + creationTimestamp: str + selfLink: str + + +class Body(TypedDict, total=False): + apiVersion: str + kind: str + metadata: Meta + spec: Spec + status: Status + + +# +# Body/Meta essences only contain the fields relevant for object diff tracking. +# They are presented to the user as part of the diff's `old`/`new` fields & kwargs. +# Added for stricter type checking, to differentiate from the actual Body/Meta. +# + + +class MetaEssence(TypedDict, total=False): + labels: Labels + annotations: Annotations + + +class BodyEssence(TypedDict, total=False): + metadata: MetaEssence + spec: Spec + + +# +# Watch-events, as received from the watch-streams +# and passed through the framework to the handlers. +# + +# ``None`` is used for the listing, when the pseudo-watch-stream is simulated. +RawEventType = Literal[None, 'ADDED', 'MODIFIED', 'DELETED', 'ERROR'] +EventType = Literal[None, 'ADDED', 'MODIFIED', 'DELETED'] + + +# A special payload for type==ERROR (this is not a connection or client error). +class Error(TypedDict, total=False): + apiVersion: str # usually: Literal['v1'] + kind: str # usually: Literal['Status'] + metadata: Mapping[Any, Any] + code: int + reason: str + status: str + message: str + + +# As received from the stream before processing the errors and special cases. +class RawEvent(TypedDict): + type: RawEventType + object: Union[Body, Error] + + +# As passed to the framework after processing the errors and special cases. +class Event(TypedDict): + type: EventType + object: Body + + +# +# Other API types, which are not body parts. +# + +class ObjectReference(TypedDict, total=False): + apiVersion: str + kind: str + namespace: Optional[str] + name: str + uid: str + + +class OwnerReference(TypedDict, total=False): + controller: bool + blockOwnerDeletion: bool + apiVersion: str + kind: str + name: str + uid: str + -def build_object_reference(body): +def build_object_reference( + body: Body, +) -> ObjectReference: """ Construct an object reference for the events. @@ -17,10 +164,12 @@ def build_object_reference(body): uid=body.get('metadata', {}).get('uid'), namespace=body.get('metadata', {}).get('namespace'), ) - return {key: val for key, val in ref.items() if val} + return cast(ObjectReference, {key: val for key, val in ref.items() if val}) -def build_owner_reference(body): +def build_owner_reference( + body: Body, +) -> OwnerReference: """ Construct an owner reference object for the parent-children relationships. @@ -38,4 +187,4 @@ def build_owner_reference(body): name=body.get('metadata', {}).get('name'), uid=body.get('metadata', {}).get('uid'), ) - return {key: val for key, val in ref.items() if val} + return cast(OwnerReference, {key: val for key, val in ref.items() if val}) diff --git a/kopf/structs/dicts.py b/kopf/structs/dicts.py index 5ee79cc1..b1bfe7a1 100644 --- a/kopf/structs/dicts.py +++ b/kopf/structs/dicts.py @@ -3,11 +3,11 @@ """ import collections.abc import enum -from typing import (Any, Union, MutableMapping, Mapping, Tuple, List, Text, - Iterable, Iterator, Optional, TypeVar) +from typing import (TypeVar, Any, Union, MutableMapping, Mapping, Tuple, List, + Iterable, Iterator, Optional) FieldPath = Tuple[str, ...] -FieldSpec = Union[None, Text, FieldPath, List[str]] +FieldSpec = Union[None, str, FieldPath, List[str]] _T = TypeVar('_T') @@ -40,7 +40,7 @@ def parse_field( def resolve( - d: Mapping, + d: Optional[Mapping[Any, Any]], field: FieldSpec, default: Union[_T, _UNSET] = _UNSET.token, *, @@ -73,10 +73,10 @@ def resolve( def ensure( - d: MutableMapping, + d: MutableMapping[Any, Any], field: FieldSpec, value: Any, -): +) -> None: """ Force-set a nested sub-field in a dict. """ @@ -93,10 +93,10 @@ def ensure( def cherrypick( - src: Mapping, - dst: MutableMapping, + src: Mapping[Any, Any], + dst: MutableMapping[Any, Any], fields: Optional[Iterable[FieldSpec]], -): +) -> None: """ Copy all specified fields between dicts (from src to dst). """ @@ -109,16 +109,33 @@ def cherrypick( def walk( - objs, + objs: Union[_T, + Iterable[_T], + Iterable[Union[_T, + Iterable[_T]]]], + *, nested: Optional[Iterable[FieldSpec]] = None, -): +) -> Iterator[_T]: """ - Iterate over one or many dicts (and sub-dicts recursively). + Iterate over objects, flattening the lists/tuples/iterables recursively. + + In plain English, the source is either an object, or a list/tuple/iterable + of objects with any level of nesting. The dicts/mappings are excluded, + despite they are iterables too, as they are treated as objects themselves. + + For the output, it yields all the objects in a flat iterable suitable for:: + + for obj in walk(objs): + pass + + The type declares only 2-level nesting, but this is done only + for type-checker's limitations. The actual nesting can be infinite. + It is highly unlikely that there will be anything deeper than one level. """ if objs is None: - return + pass elif isinstance(objs, collections.abc.Mapping): - yield objs + yield objs # type: ignore for subfield in (nested if nested is not None else []): try: yield resolve(objs, parse_field(subfield)) @@ -158,7 +175,7 @@ def __init__(self, __src: Mapping[Any, Any], __path: FieldSpec = None): self._src = __src self._path = parse_field(__path) - def __repr__(self): + def __repr__(self) -> str: return repr(dict(self)) def __len__(self) -> int: diff --git a/kopf/structs/diffs.py b/kopf/structs/diffs.py index 5c0e7fd0..bbd68cfe 100644 --- a/kopf/structs/diffs.py +++ b/kopf/structs/diffs.py @@ -3,7 +3,7 @@ """ import collections.abc import enum -from typing import Any, Iterator, Sequence, NamedTuple, Iterable +from typing import Any, Iterator, Sequence, NamedTuple, Iterable, Union, overload from kopf.structs import dicts @@ -13,10 +13,10 @@ class DiffOperation(str, enum.Enum): CHANGE = 'change' REMOVE = 'remove' - def __str__(self): - return self.value + def __str__(self) -> str: + return str(self.value) - def __repr__(self): + def __repr__(self) -> str: return repr(self.value) @@ -26,17 +26,23 @@ class DiffItem(NamedTuple): old: Any new: Any - def __repr__(self): + def __repr__(self) -> str: return repr(tuple(self)) - def __eq__(self, other): - return tuple(self) == tuple(other) + def __eq__(self, other: object) -> bool: + if isinstance(other, collections.abc.Sequence): + return tuple(self) == tuple(other) + else: + return NotImplemented - def __ne__(self, other): - return tuple(self) != tuple(other) + def __ne__(self, other: object) -> bool: + if isinstance(other, collections.abc.Sequence): + return tuple(self) != tuple(other) + else: + return NotImplemented @property - def op(self): + def op(self) -> DiffOperation: return self.operation @@ -46,26 +52,41 @@ def __init__(self, __items: Iterable[DiffItem]): super().__init__() self._items = tuple(DiffItem(*item) for item in __items) - def __repr__(self): + def __repr__(self) -> str: return repr(self._items) - def __len__(self): + def __len__(self) -> int: return len(self._items) - def __iter__(self): + def __iter__(self) -> Iterator[DiffItem]: return iter(self._items) - def __getitem__(self, item): + @overload + def __getitem__(self, i: int) -> DiffItem: ... + + @overload + def __getitem__(self, s: slice) -> Sequence[DiffItem]: ... + + def __getitem__(self, item: Union[int, slice]) -> Union[DiffItem, Sequence[DiffItem]]: return self._items[item] - def __eq__(self, other): - return tuple(self) == tuple(other) + def __eq__(self, other: object) -> bool: + if isinstance(other, collections.abc.Sequence): + return tuple(self) == tuple(other) + else: + return NotImplemented - def __ne__(self, other): - return tuple(self) != tuple(other) + def __ne__(self, other: object) -> bool: + if isinstance(other, collections.abc.Sequence): + return tuple(self) != tuple(other) + else: + return NotImplemented -def reduce_iter(d: Diff, path: dicts.FieldPath) -> Iterator[DiffItem]: +def reduce_iter( + d: Diff, + path: dicts.FieldPath, +) -> Iterator[DiffItem]: for op, field, old, new in d: # As-is diff (i.e. a root field). @@ -86,11 +107,18 @@ def reduce_iter(d: Diff, path: dicts.FieldPath) -> Iterator[DiffItem]: yield from diff_iter(old_tail, new_tail) -def reduce(d: Diff, path: dicts.FieldPath) -> Diff: +def reduce( + d: Diff, + path: dicts.FieldPath, +) -> Diff: return Diff(reduce_iter(d, path)) -def diff_iter(a: Any, b: Any, path: dicts.FieldPath = ()) -> Iterator[DiffItem]: +def diff_iter( + a: Any, + b: Any, + path: dicts.FieldPath = (), +) -> Iterator[DiffItem]: """ Calculate the diff between two dicts. @@ -130,8 +158,15 @@ def diff_iter(a: Any, b: Any, path: dicts.FieldPath = ()) -> Iterator[DiffItem]: yield DiffItem(DiffOperation.CHANGE, path, a, b) -def diff(a: Any, b: Any, path: dicts.FieldPath = ()) -> Diff: +def diff( + a: Any, + b: Any, + path: dicts.FieldPath = (), +) -> Diff: """ Same as `diff`, but returns the whole tuple instead of iterator. """ return Diff(diff_iter(a, b, path=path)) + + +EMPTY = diff(None, None) diff --git a/kopf/structs/finalizers.py b/kopf/structs/finalizers.py index a8014961..8a6e1914 100644 --- a/kopf/structs/finalizers.py +++ b/kopf/structs/finalizers.py @@ -5,6 +5,8 @@ are removed, meaning that the operator has done all its duties to "release" the object (e.g. cleanups; delete-handlers in our case). """ +from kopf.structs import bodies +from kopf.structs import patches # A string marker to be put on the list of the finalizers to block # the object from being deleted without the permission of the framework. @@ -12,23 +14,35 @@ LEGACY_FINALIZER = 'KopfFinalizerMarker' -def is_deleted(body): +def is_deleted( + body: bodies.Body, +) -> bool: return body.get('metadata', {}).get('deletionTimestamp', None) is not None -def has_finalizers(body): +def has_finalizers( + body: bodies.Body, +) -> bool: finalizers = body.get('metadata', {}).get('finalizers', []) return FINALIZER in finalizers or LEGACY_FINALIZER in finalizers -def append_finalizers(*, body, patch): +def append_finalizers( + *, + body: bodies.Body, + patch: patches.Patch, +) -> None: if not has_finalizers(body=body): finalizers = body.get('metadata', {}).get('finalizers', []) patch.setdefault('metadata', {}).setdefault('finalizers', list(finalizers)) patch['metadata']['finalizers'].append(FINALIZER) -def remove_finalizers(*, body, patch): +def remove_finalizers( + *, + body: bodies.Body, + patch: patches.Patch, +) -> None: if has_finalizers(body=body): finalizers = body.get('metadata', {}).get('finalizers', []) patch.setdefault('metadata', {}).setdefault('finalizers', list(finalizers)) diff --git a/kopf/structs/lastseen.py b/kopf/structs/lastseen.py index d8f362a1..b2f5ebf6 100644 --- a/kopf/structs/lastseen.py +++ b/kopf/structs/lastseen.py @@ -15,15 +15,21 @@ import copy import json +from typing import Optional, Iterable, Tuple, Dict, Any, cast +from kopf.structs import bodies from kopf.structs import dicts from kopf.structs import diffs +from kopf.structs import patches LAST_SEEN_ANNOTATION = 'kopf.zalando.org/last-handled-configuration' """ The annotation name for the last stored state of the resource. """ -def get_state(body, extra_fields=None): +def get_state( + body: bodies.Body, + extra_fields: Optional[Iterable[dicts.FieldSpec]] = None, +) -> bodies.BodyEssence: """ Extract only the relevant fields for the state comparisons. @@ -38,29 +44,29 @@ def get_state(body, extra_fields=None): """ # Always use a copy, so that future changes do not affect the extracted state. - orig = copy.deepcopy(body) - body = copy.deepcopy(body) + # TODO: is it possible to declare state as a Body, to ensure the proper keys are used? + essence = cast(Dict[Any, Any], copy.deepcopy(body)) # The top-level identifying fields never change, so there is not need to track them. - if 'apiVersion' in body: - del body['apiVersion'] - if 'kind' in body: - del body['kind'] + if 'apiVersion' in essence: + del essence['apiVersion'] + if 'kind' in essence: + del essence['kind'] # Purge the whole stenzas with system info (extra-fields are restored below). - if 'metadata' in body: - del body['metadata'] - if 'status' in body: - del body['status'] + if 'metadata' in essence: + del essence['metadata'] + if 'status' in essence: + del essence['status'] # We want some selected metadata to be tracked implicitly. - dicts.cherrypick(src=orig, dst=body, fields=[ + dicts.cherrypick(src=body, dst=essence, fields=[ 'metadata.labels', 'metadata.annotations', # but not all of them! deleted below. ]) # But we do not want not all of the annotations, only potentially useful. - annotations = body.get('metadata', {}).get('annotations', {}) + annotations = essence.get('metadata', {}).get('annotations', {}) for annotation in list(annotations): if annotation == LAST_SEEN_ANNOTATION: del annotations[annotation] @@ -68,35 +74,50 @@ def get_state(body, extra_fields=None): del annotations[annotation] # Restore all explicitly whitelisted extra-fields from the original body. - dicts.cherrypick(src=orig, dst=body, fields=extra_fields) + dicts.cherrypick(src=body, dst=essence, fields=extra_fields) # Cleanup the parent structs if they have become empty, for consistent state comparison. - if 'annotations' in body.get('metadata', {}) and not body['metadata']['annotations']: - del body['metadata']['annotations'] - if 'metadata' in body and not body['metadata']: - del body['metadata'] - if 'status' in body and not body['status']: - del body['status'] - return body + if 'annotations' in essence.get('metadata', {}) and not essence['metadata']['annotations']: + del essence['metadata']['annotations'] + if 'metadata' in essence and not essence['metadata']: + del essence['metadata'] + if 'status' in essence and not essence['status']: + del essence['status'] + + return cast(bodies.BodyEssence, essence) -def has_state(body): +def has_essence_stored( + body: bodies.Body, +) -> bool: annotations = body.get('metadata', {}).get('annotations', {}) return LAST_SEEN_ANNOTATION in annotations -def get_state_diffs(body, extra_fields=None): - old = retreive_state(body) - new = get_state(body, extra_fields=extra_fields) +def get_essential_diffs( + body: bodies.Body, + extra_fields: Optional[Iterable[dicts.FieldSpec]] = None, +) -> Tuple[Optional[bodies.BodyEssence], Optional[bodies.BodyEssence], diffs.Diff]: + old: Optional[bodies.BodyEssence] = retreive_essence(body) + new: Optional[bodies.BodyEssence] = get_state(body, extra_fields=extra_fields) return old, new, diffs.diff(old, new) -def retreive_state(body): - state_str = body.get('metadata', {}).get('annotations', {}).get(LAST_SEEN_ANNOTATION, None) - state_obj = json.loads(state_str) if state_str is not None else None +def retreive_essence( + body: bodies.Body, +) -> Optional[bodies.BodyEssence]: + if not has_essence_stored(body): + return None + state_str: str = body['metadata']['annotations'][LAST_SEEN_ANNOTATION] + state_obj: bodies.BodyEssence = json.loads(state_str) return state_obj -def refresh_state(*, body, patch, extra_fields=None): +def refresh_essence( + *, + body: bodies.Body, + patch: patches.Patch, + extra_fields: Optional[Iterable[dicts.FieldSpec]] = None, +) -> None: state = get_state(body, extra_fields=extra_fields) patch.setdefault('metadata', {}).setdefault('annotations', {})[LAST_SEEN_ANNOTATION] = json.dumps(state) diff --git a/kopf/structs/patches.py b/kopf/structs/patches.py new file mode 100644 index 00000000..ed5fe775 --- /dev/null +++ b/kopf/structs/patches.py @@ -0,0 +1,16 @@ +""" +All the structures needed for Kubernetes patching. + +Currently, it is implemented via a JSON merge-patch (RFC 7386), +i.e. a simple dictionary with field overrides, and ``None`` for field deletions. + +In the future, it can be extended to a standalone object, which exposes +a dict-like behaviour, and remembers the changes in order of their execution, +and then generates the JSON patch (RFC 6902). +""" +from typing import Any, Dict + + +# Event-handling structures, used internally in the framework and handlers only. +class Patch(Dict[Any, Any]): + pass diff --git a/kopf/structs/resources.py b/kopf/structs/resources.py index 1d16fc3c..29c1bd55 100644 --- a/kopf/structs/resources.py +++ b/kopf/structs/resources.py @@ -8,10 +8,10 @@ class Resource(NamedTuple): plural: str @property - def name(self): + def name(self) -> str: return f'{self.plural}.{self.group}' @property - def api_version(self): + def api_version(self) -> str: # Strip heading/trailing slashes if group is absent (e.g. for pods). return f'{self.group}/{self.version}'.strip('/') diff --git a/kopf/toolkits/hierarchies.py b/kopf/toolkits/hierarchies.py index cf3cb71a..0a171634 100644 --- a/kopf/toolkits/hierarchies.py +++ b/kopf/toolkits/hierarchies.py @@ -1,45 +1,62 @@ """ All the functions to properly build the object hierarchies. """ +from typing import Optional, Iterable, Iterator, cast, MutableMapping, Any, Union + from kopf.structs import bodies from kopf.structs import dicts +K8sObject = MutableMapping[Any, Any] +K8sObjects = Union[K8sObject, Iterable[K8sObject]] + -def append_owner_reference(objs, owner): +def append_owner_reference( + objs: K8sObjects, + owner: bodies.Body, +) -> None: """ Append an owner reference to the resource(s), if it is not yet there. Note: the owned objects are usually not the one being processed, so the whole body can be modified, no patches are needed. """ - owner = bodies.build_owner_reference(owner) - for obj in dicts.walk(objs): + owner_ref = bodies.build_owner_reference(owner) + for obj in cast(Iterator[K8sObject], dicts.walk(objs)): refs = obj.setdefault('metadata', {}).setdefault('ownerReferences', []) - matching = [ref for ref in refs if ref.get('uid') == owner.get('uid')] + matching = [ref for ref in refs if ref.get('uid') == owner_ref['uid']] if not matching: - refs.append(owner) + refs.append(owner_ref) -def remove_owner_reference(objs, owner): +def remove_owner_reference( + objs: K8sObjects, + owner: bodies.Body, +) -> None: """ Remove an owner reference to the resource(s), if it is there. Note: the owned objects are usually not the one being processed, so the whole body can be modified, no patches are needed. """ - owner = bodies.build_owner_reference(owner) - for obj in dicts.walk(objs): + owner_ref = bodies.build_owner_reference(owner) + for obj in cast(Iterator[K8sObject], dicts.walk(objs)): refs = obj.setdefault('metadata', {}).setdefault('ownerReferences', []) - matching = [ref for ref in refs if ref.get('uid') == owner.get('uid')] + matching = [ref for ref in refs if ref.get('uid') == owner_ref['uid']] for ref in matching: refs.remove(ref) -def label(objs, labels, *, force=False, nested=None): +def label( + objs: K8sObjects, + labels: bodies.Labels, + *, + force: bool = False, + nested: Optional[Iterable[dicts.FieldSpec]] = None, +) -> None: """ Apply the labels to the object(s). """ - for obj in dicts.walk(objs, nested=nested): + for obj in cast(Iterator[K8sObject], dicts.walk(objs, nested=nested)): obj_labels = obj.setdefault('metadata', {}).setdefault('labels', {}) for key, val in labels.items(): if force: @@ -48,7 +65,11 @@ def label(objs, labels, *, force=False, nested=None): obj_labels.setdefault(key, val) -def harmonize_naming(objs, name=None, strict=False): +def harmonize_naming( + objs: K8sObjects, + name: Optional[str] = None, + strict: bool = False, +) -> None: """ Adjust the names or prefixes of the objects. @@ -63,7 +84,7 @@ def harmonize_naming(objs, name=None, strict=False): If the objects already have their own names, auto-naming is not applied, and the existing names are used as is. """ - for obj in dicts.walk(objs): + for obj in cast(Iterator[K8sObject], dicts.walk(objs)): if obj.get('metadata', {}).get('name', None) is None: if strict: obj.setdefault('metadata', {}).setdefault('name', name) @@ -71,7 +92,10 @@ def harmonize_naming(objs, name=None, strict=False): obj.setdefault('metadata', {}).setdefault('generateName', f'{name}-') -def adjust_namespace(objs, namespace=None): +def adjust_namespace( + objs: K8sObjects, + namespace: Optional[str] = None, +) -> None: """ Adjust the namespace of the objects. @@ -80,11 +104,16 @@ def adjust_namespace(objs, namespace=None): It is a common practice to keep the children objects in the same namespace as their owner, unless explicitly overridden at time of creation. """ - for obj in dicts.walk(objs): + for obj in cast(Iterator[K8sObject], dicts.walk(objs)): obj.setdefault('metadata', {}).setdefault('namespace', namespace) -def adopt(objs, owner, *, nested=None): +def adopt( + objs: K8sObjects, + owner: bodies.Body, + *, + nested: Optional[Iterable[dicts.FieldSpec]] = None, +) -> None: """ The children should be in the same namespace, named after their parent, and owned by it. """ diff --git a/kopf/toolkits/runner.py b/kopf/toolkits/runner.py index 7d1962f6..d1df7975 100644 --- a/kopf/toolkits/runner.py +++ b/kopf/toolkits/runner.py @@ -1,13 +1,28 @@ import asyncio import concurrent.futures +import contextlib import threading +import types +from typing import cast, Any, Optional, Tuple, Type, TYPE_CHECKING import click.testing from kopf import cli +_ExcType = BaseException +_ExcInfo = Tuple[Type[_ExcType], _ExcType, types.TracebackType] -class KopfRunner: +if TYPE_CHECKING: + ResultFuture = concurrent.futures.Future[click.testing.Result] + class _AbstractKopfRunner(contextlib.AbstractContextManager["_AbstractKopfRunner"]): + pass +else: + ResultFuture = concurrent.futures.Future + class _AbstractKopfRunner(contextlib.AbstractContextManager): + pass + + +class KopfRunner(_AbstractKopfRunner): """ A context manager to run a Kopf-based operator in parallel with the tests. @@ -37,8 +52,13 @@ class KopfRunner: Second, mocking works within one process (all threads), but not across processes --- the mock's calls (counts, arrgs) are lost. """ + _future: ResultFuture - def __init__(self, *args, reraise=True, timeout=None, **kwargs): + def __init__(self, + *args: Any, + reraise: bool = True, + timeout: Optional[float] = None, + **kwargs: Any): super().__init__() self.args = args self.kwargs = kwargs @@ -49,17 +69,21 @@ def __init__(self, *args, reraise=True, timeout=None, **kwargs): self._thread = threading.Thread(target=self._target) self._future = concurrent.futures.Future() - def __enter__(self): + def __enter__(self) -> "KopfRunner": self._thread.start() self._ready.wait() # should be nanosecond-fast return self - def __exit__(self, exc_type, exc_val, exc_tb): + def __exit__(self, + exc_type: Optional[Type[BaseException]], + exc_val: Optional[BaseException], + exc_tb: Optional[types.TracebackType], + ) -> Optional[bool]: # A coroutine that is injected into the loop to cancel everything in it. # Cancellations are caught in `run`, so that it exits gracefully. # TODO: also cancel/stop the streaming API calls in the thread executor. - async def shutdown(): + async def shutdown() -> None: current_task = asyncio.current_task() tasks = [task for task in asyncio.all_tasks() if task is not current_task] for task in tasks: @@ -79,16 +103,18 @@ async def shutdown(): # Re-raise the exceptions of the threading & invocation logic. if self._future.exception() is not None: if exc_val is None: - raise self._future.exception() + raise self._future.exception() # type: ignore else: - raise self._future.exception() from exc_val + raise self._future.exception() from exc_val # type: ignore if self._future.result().exception is not None and self.reraise: if exc_val is None: raise self._future.result().exception else: raise self._future.result().exception from exc_val - def _target(self): + return False + + def _target(self) -> None: # Every thread must have its own loop. The parent thread (pytest) # needs to know when the loop is set up, to be able to shut it down. @@ -106,37 +132,37 @@ def _target(self): self._future.set_result(result) @property - def future(self): + def future(self) -> ResultFuture: return self._future @property - def output(self): + def output(self) -> str: return self.future.result().output @property - def stdout(self): + def stdout(self) -> str: return self.future.result().stdout @property - def stdout_bytes(self): + def stdout_bytes(self) -> bytes: return self.future.result().stdout_bytes @property - def stderr(self): + def stderr(self) -> str: return self.future.result().stderr @property - def stderr_bytes(self): + def stderr_bytes(self) -> bytes: return self.future.result().stderr_bytes @property - def exit_code(self): + def exit_code(self) -> int: return self.future.result().exit_code @property - def exception(self): - return self.future.result().exception + def exception(self) -> _ExcType: + return cast(_ExcType, self.future.result().exception) @property - def exc_info(self): - return self.future.result().exc_info + def exc_info(self) -> _ExcInfo: + return cast(_ExcInfo, self.future.result().exc_info) diff --git a/kopf/utilities/loaders.py b/kopf/utilities/loaders.py index 53bdfd26..dc135133 100644 --- a/kopf/utilities/loaders.py +++ b/kopf/utilities/loaders.py @@ -18,9 +18,13 @@ import importlib.util import os.path import sys +from typing import Iterable -def preload(paths, modules): +def preload( + paths: Iterable[str], + modules: Iterable[str], +) -> None: """ Ensure the handlers are registered by loading/importing the files/modules. """ @@ -30,7 +34,7 @@ def preload(paths, modules): name, _ = os.path.splitext(os.path.basename(path)) spec = importlib.util.spec_from_file_location(name, path) module = importlib.util.module_from_spec(spec) - spec.loader.exec_module(module) + spec.loader.exec_module(module) # type: ignore for name in modules: importlib.import_module(name) diff --git a/setup.py b/setup.py index 51cf1775..74536602 100644 --- a/setup.py +++ b/setup.py @@ -40,6 +40,7 @@ 'setuptools_scm', ], install_requires=[ + 'typing_extensions', 'click', 'iso8601', 'aiojobs', diff --git a/tests/hierarchies/test_owner_referencing.py b/tests/hierarchies/test_owner_referencing.py index 24b96baf..32981364 100644 --- a/tests/hierarchies/test_owner_referencing.py +++ b/tests/hierarchies/test_owner_referencing.py @@ -3,23 +3,24 @@ from unittest.mock import call, Mock import kopf +from kopf.structs.bodies import Body, Meta, Labels OWNER_API_VERSION = 'owner-api-version' OWNER_NAMESPACE = 'owner-namespace' OWNER_KIND = 'OwnerKind' OWNER_NAME = 'owner-name' OWNER_UID = 'owner-uid' -OWNER_LABELS = {'label-1': 'value-1', 'label-2': 'value-2'} -OWNER = { - 'apiVersion': OWNER_API_VERSION, - 'kind': OWNER_KIND, - 'metadata': { - 'namespace': OWNER_NAMESPACE, - 'name': OWNER_NAME, - 'uid': OWNER_UID, - 'labels': OWNER_LABELS, - }, -} +OWNER_LABELS: Labels = {'label-1': 'value-1', 'label-2': 'value-2'} +OWNER = Body( + apiVersion=OWNER_API_VERSION, + kind=OWNER_KIND, + metadata=Meta( + namespace=OWNER_NAMESPACE, + name=OWNER_NAME, + uid=OWNER_UID, + labels=OWNER_LABELS, + ), +) def test_appending_to_dict(): diff --git a/tests/invocations/test_callbacks.py b/tests/invocations/test_callbacks.py index 09540a45..2e8c6d0d 100644 --- a/tests/invocations/test_callbacks.py +++ b/tests/invocations/test_callbacks.py @@ -117,7 +117,7 @@ async def wrapper(*args, wrapper=wrapper, **kwargs): async def test_detection_for_none(): is_async = is_async_fn(None) - assert is_async is None + assert not is_async @syncasyncparams diff --git a/tests/lifecycles/test_global_defaults.py b/tests/lifecycles/test_global_defaults.py index 3ae50388..6faeb46b 100644 --- a/tests/lifecycles/test_global_defaults.py +++ b/tests/lifecycles/test_global_defaults.py @@ -7,14 +7,14 @@ def test_getting_default_lifecycle(): def test_setting_default_lifecycle(): - lifecycle_expected = lambda *args, **kwargs: None + lifecycle_expected = lambda handlers, *args, **kwargs: handlers kopf.set_default_lifecycle(lifecycle_expected) lifecycle_actual = kopf.get_default_lifecycle() assert lifecycle_actual is lifecycle_expected def test_resetting_default_lifecycle(): - lifecycle_distracting = lambda *args, **kwargs: None + lifecycle_distracting = lambda handlers, *args, **kwargs: handlers kopf.set_default_lifecycle(lifecycle_distracting) kopf.set_default_lifecycle(None) lifecycle_actual = kopf.get_default_lifecycle() diff --git a/tests/test_lastseen.py b/tests/test_lastseen.py index 03fd00db..2dc2f699 100644 --- a/tests/test_lastseen.py +++ b/tests/test_lastseen.py @@ -3,9 +3,9 @@ import pytest from kopf.structs.lastseen import LAST_SEEN_ANNOTATION -from kopf.structs.lastseen import has_state, get_state -from kopf.structs.lastseen import get_state_diffs -from kopf.structs.lastseen import retreive_state, refresh_state +from kopf.structs.lastseen import has_essence_stored, get_state +from kopf.structs.lastseen import get_essential_diffs +from kopf.structs.lastseen import retreive_essence, refresh_essence def test_annotation_is_fqdn(): @@ -19,7 +19,7 @@ def test_annotation_is_fqdn(): pytest.param(True, {'metadata': {'annotations': {LAST_SEEN_ANNOTATION: ''}}}, id='present'), ]) def test_has_state(expected, body): - result = has_state(body=body) + result = has_essence_stored(body=body) assert result == expected @@ -111,7 +111,7 @@ def test_refresh_state(): body = {'spec': {'depth': {'field': 'x'}}} patch = {} encoded = json.dumps(body) # json formatting can vary across interpreters - refresh_state(body=body, patch=patch) + refresh_essence(body=body, patch=patch) assert patch['metadata']['annotations'][LAST_SEEN_ANNOTATION] == encoded @@ -119,13 +119,13 @@ def test_retreive_state_when_present(): data = {'spec': {'depth': {'field': 'x'}}} encoded = json.dumps(data) # json formatting can vary across interpreters body = {'metadata': {'annotations': {LAST_SEEN_ANNOTATION: encoded}}} - state = retreive_state(body=body) + state = retreive_essence(body=body) assert state == data def test_retreive_state_when_absent(): body = {} - state = retreive_state(body=body) + state = retreive_essence(body=body) assert state is None @@ -133,7 +133,7 @@ def test_state_changed_detected(): data = {'spec': {'depth': {'field': 'x'}}} encoded = json.dumps(data) # json formatting can vary across interpreters body = {'metadata': {'annotations': {LAST_SEEN_ANNOTATION: encoded}}} - old, new, diff = get_state_diffs(body=body) + old, new, diff = get_essential_diffs(body=body) assert diff @@ -142,7 +142,7 @@ def test_state_change_ignored_with_garbage_annotations(): encoded = json.dumps(data) # json formatting can vary across interpreters body = {'metadata': {'annotations': {LAST_SEEN_ANNOTATION: encoded}}, 'spec': {'depth': {'field': 'x'}}} - old, new, diff = get_state_diffs(body=body) + old, new, diff = get_essential_diffs(body=body) assert not diff @@ -162,7 +162,7 @@ def test_state_changed_ignored_with_system_fields(): 'other': 'x' }, 'spec': {'depth': {'field': 'x'}}} - old, new, diff = get_state_diffs(body=body) + old, new, diff = get_essential_diffs(body=body) assert not diff @@ -174,7 +174,7 @@ def test_state_diff(): body = {'metadata': {'annotations': {LAST_SEEN_ANNOTATION: encoded}}, 'status': {'x': 'y'}, 'spec': {'depth': {'field': 'y'}}} - old, new, diff = get_state_diffs(body=body, extra_fields=['status.x']) + old, new, diff = get_essential_diffs(body=body, extra_fields=['status.x']) assert old == {'spec': {'depth': {'field': 'x'}}} assert new == {'spec': {'depth': {'field': 'y'}}, 'status': {'x': 'y'}} assert len(diff) == 2 # spec.depth.field & status.x, but the order is not known. From f4045eacde37cc68e70d4c11befb1833a0477c94 Mon Sep 17 00:00:00 2001 From: Sergey Vasilyev Date: Wed, 2 Oct 2019 20:17:02 +0200 Subject: [PATCH 5/8] Configure mypy strict checks on CI --- .gitignore | 1 + .travis.yml | 1 + docs/conf.py | 1 + mypy.ini | 3 +++ requirements.txt | 1 + 5 files changed, 7 insertions(+) create mode 100644 mypy.ini diff --git a/.gitignore b/.gitignore index 47e45f5b..02b7604d 100644 --- a/.gitignore +++ b/.gitignore @@ -21,6 +21,7 @@ coverage.xml junit.xml *.cover .hypothesis/ +.mypy_cache # Documentation docs/_build diff --git a/.travis.yml b/.travis.yml index 9a189464..2bc77802 100644 --- a/.travis.yml +++ b/.travis.yml @@ -43,6 +43,7 @@ script: - coveralls - codecov --flags unit - pytest -v --only-e2e # NB: after the coverage uploads! + - mypy kopf --strict --pretty deploy: provider: pypi diff --git a/docs/conf.py b/docs/conf.py index e1a48ea1..83ac0e86 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -40,6 +40,7 @@ intersphinx_mapping = { 'python': ('https://docs.python.org/3', None), + 'mypy': ('https://mypy.readthedocs.io/en/latest/', None), } diff --git a/mypy.ini b/mypy.ini new file mode 100644 index 00000000..fc40ca5e --- /dev/null +++ b/mypy.ini @@ -0,0 +1,3 @@ +[mypy] +warn_unused_configs = True +ignore_missing_imports = True diff --git a/requirements.txt b/requirements.txt index 33bd05ca..d006e63f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -13,3 +13,4 @@ asynctest freezegun codecov coveralls +mypy From f2c70d838c928c279dcf0319a4e7ad047176f2d9 Mon Sep 17 00:00:00 2001 From: Sergey Vasilyev Date: Sun, 6 Oct 2019 18:40:25 +0200 Subject: [PATCH 6/8] Reformat the class methods to the same indentation as the functions --- kopf/engines/logging.py | 14 ++- kopf/engines/peering.py | 36 +++--- kopf/reactor/handling.py | 9 +- kopf/reactor/lifecycles.py | 13 ++- kopf/reactor/queueing.py | 11 +- kopf/reactor/registries.py | 218 ++++++++++++++++++++----------------- kopf/toolkits/runner.py | 23 ++-- 7 files changed, 176 insertions(+), 148 deletions(-) diff --git a/kopf/engines/logging.py b/kopf/engines/logging.py index eff39d5d..f85af979 100644 --- a/kopf/engines/logging.py +++ b/kopf/engines/logging.py @@ -97,22 +97,24 @@ def __init__(self, *, body: bodies.Body): ), )) - def process(self, - msg: str, - kwargs: MutableMapping[str, Any], - ) -> Tuple[str, MutableMapping[str, Any]]: + def process( + self, + msg: str, + kwargs: MutableMapping[str, Any], + ) -> Tuple[str, MutableMapping[str, Any]]: # Native logging overwrites the message's extra with the adapter's extra. # We merge them, so that both message's & adapter's extras are available. kwargs["extra"] = dict(self.extra, **kwargs.get('extra', {})) return msg, kwargs - def log(self, + def log( + self, level: int, msg: str, *args: Any, local: bool = False, **kwargs: Any, - ) -> None: + ) -> None: if local: kwargs['extra'] = dict(kwargs.pop('extra', {}), k8s_skip=True) super().log(level, msg, *args, **kwargs) diff --git a/kopf/engines/peering.py b/kopf/engines/peering.py index 91020bd4..5240a026 100644 --- a/kopf/engines/peering.py +++ b/kopf/engines/peering.py @@ -59,17 +59,18 @@ # The extra fields are for easier calculation when and if the peer is dead to the moment. class Peer: - def __init__(self, - id: str, - *, - name: str, - priority: int = 0, - lastseen: Optional[str] = None, - lifetime: int = 60, - namespace: Optional[str] = None, - legacy: bool = False, - **_: Any, # for the forward-compatibility with the new fields - ): + def __init__( + self, + id: str, + *, + name: str, + priority: int = 0, + lastseen: Optional[str] = None, + lifetime: int = 60, + namespace: Optional[str] = None, + legacy: bool = False, + **_: Any, # for the forward-compatibility with the new fields + ): super().__init__() self.id = id self.name = name @@ -93,12 +94,13 @@ def resource(self) -> resources.Resource: return LEGACY_PEERING_RESOURCE if self.legacy else CLUSTER_PEERING_RESOURCE if self.namespace is None else NAMESPACED_PEERING_RESOURCE @classmethod - def detect(cls, - standalone: bool, - namespace: Optional[str], - name: Optional[str], - **kwargs: Any, - ) -> Optional["Peer"]: + def detect( + cls, + standalone: bool, + namespace: Optional[str], + name: Optional[str], + **kwargs: Any, + ) -> Optional["Peer"]: if standalone: return None diff --git a/kopf/reactor/handling.py b/kopf/reactor/handling.py index a8e33b0d..ae92094b 100644 --- a/kopf/reactor/handling.py +++ b/kopf/reactor/handling.py @@ -50,10 +50,11 @@ class PermanentError(Exception): class TemporaryError(Exception): """ A potentially recoverable error, should be retried. """ - def __init__(self, - __msg: Optional[str] = None, - delay: Optional[float] = DEFAULT_RETRY_DELAY, - ): + def __init__( + self, + __msg: Optional[str] = None, + delay: Optional[float] = DEFAULT_RETRY_DELAY, + ): super().__init__(__msg) self.delay = delay diff --git a/kopf/reactor/lifecycles.py b/kopf/reactor/lifecycles.py index 7fd79ec5..d4ab607d 100644 --- a/kopf/reactor/lifecycles.py +++ b/kopf/reactor/lifecycles.py @@ -32,12 +32,13 @@ class LifeCycleFn(Protocol): and specific return type. But we cannot express it with `typing`. For the names and types of kwargs, see `Invokable`. """ - def __call__(self, - handlers: Handlers, - *, - body: bodies.Body, - **kwargs: Any, - ) -> Handlers: ... + def __call__( + self, + handlers: Handlers, + *, + body: bodies.Body, + **kwargs: Any, + ) -> Handlers: ... def all_at_once(handlers: Handlers, **kwargs: Any) -> Handlers: diff --git a/kopf/reactor/queueing.py b/kopf/reactor/queueing.py index 9d83ec06..c7816579 100644 --- a/kopf/reactor/queueing.py +++ b/kopf/reactor/queueing.py @@ -41,11 +41,12 @@ class WatcherCallback(Protocol): - async def __call__(self, - *, - event: bodies.Event, - replenished: asyncio.Event, - ) -> None: ... + async def __call__( + self, + *, + event: bodies.Event, + replenished: asyncio.Event, + ) -> None: ... # An end-of-stream marker sent from the watcher to the workers. diff --git a/kopf/reactor/registries.py b/kopf/reactor/registries.py index 1c080b8c..6575e536 100644 --- a/kopf/reactor/registries.py +++ b/kopf/reactor/registries.py @@ -33,24 +33,25 @@ class HandlerFn(Protocol): - def __call__(self, - *args: Any, - type: str, - event: Union[str, bodies.Event], # FIXME: or str for cause-handlers. - body: bodies.Body, - meta: bodies.Meta, - spec: bodies.Spec, - status: bodies.Status, - uid: str, - name: str, - namespace: Optional[str], - patch: patches.Patch, - logger: Union[logging.Logger, logging.LoggerAdapter], - diff: diffs.Diff, # TODO:? Optional[diffs.Diff]? - old: bodies.Body, # TODO: or Any for field-handlers. - new: bodies.Body, # TODO: or Any for field-handlers. - **kwargs: Any, - ) -> Any: ... + def __call__( + self, + *args: Any, + type: str, + event: Union[str, bodies.Event], # FIXME: or str for cause-handlers. + body: bodies.Body, + meta: bodies.Meta, + spec: bodies.Spec, + status: bodies.Status, + uid: str, + name: str, + namespace: Optional[str], + patch: patches.Patch, + logger: Union[logging.Logger, logging.LoggerAdapter], + diff: diffs.Diff, # TODO:? Optional[diffs.Diff]? + old: bodies.Body, # TODO: or Any for field-handlers. + new: bodies.Body, # TODO: or Any for field-handlers. + **kwargs: Any, + ) -> Any: ... # A registered handler (function + event meta info). @@ -70,28 +71,32 @@ class BaseRegistry(metaclass=abc.ABCMeta): A registry stores the handlers and provides them to the reactor. """ - def get_cause_handlers(self, - cause: causation.Cause, - ) -> Sequence[Handler]: + def get_cause_handlers( + self, + cause: causation.Cause, + ) -> Sequence[Handler]: return list(self._deduplicated(self.iter_cause_handlers(cause=cause))) @abc.abstractmethod - def iter_cause_handlers(self, - cause: causation.Cause, - ) -> Iterator[Handler]: + def iter_cause_handlers( + self, + cause: causation.Cause, + ) -> Iterator[Handler]: pass - def get_event_handlers(self, - resource: resources_.Resource, - event: bodies.Event, - ) -> Sequence[Handler]: + def get_event_handlers( + self, + resource: resources_.Resource, + event: bodies.Event, + ) -> Sequence[Handler]: return list(self._deduplicated(self.iter_event_handlers(resource=resource, event=event))) @abc.abstractmethod - def iter_event_handlers(self, - resource: resources_.Resource, - event: bodies.Event, - ) -> Iterator[Handler]: + def iter_event_handlers( + self, + resource: resources_.Resource, + event: bodies.Event, + ) -> Iterator[Handler]: pass def get_extra_fields(self, resource: resources_.Resource) -> Set[dicts.FieldPath]: @@ -149,17 +154,18 @@ def __bool__(self) -> bool: def append(self, handler: Handler) -> None: self._handlers.append(handler) - def register(self, - fn: HandlerFn, - id: Optional[str] = None, - event: Optional[str] = None, - field: Optional[dicts.FieldSpec] = None, - timeout: Optional[float] = None, - initial: Optional[bool] = None, - requires_finalizer: bool = False, - labels: Optional[bodies.Labels] = None, - annotations: Optional[bodies.Annotations] = None, - ) -> HandlerFn: + def register( + self, + fn: HandlerFn, + id: Optional[str] = None, + event: Optional[str] = None, + field: Optional[dicts.FieldSpec] = None, + timeout: Optional[float] = None, + initial: Optional[bool] = None, + requires_finalizer: bool = False, + labels: Optional[bodies.Labels] = None, + annotations: Optional[bodies.Annotations] = None, + ) -> HandlerFn: if field is None: field = None # for the non-field events elif isinstance(field, str): @@ -183,9 +189,10 @@ def register(self, return fn # to be usable as a decorator too. - def iter_cause_handlers(self, - cause: causation.Cause, - ) -> Iterator[Handler]: + def iter_cause_handlers( + self, + cause: causation.Cause, + ) -> Iterator[Handler]: changed_fields = frozenset(field for _, field, _, _ in cause.diff or []) for handler in self._handlers: if handler.event is None or handler.event == cause.event: @@ -194,25 +201,28 @@ def iter_cause_handlers(self, elif match(handler=handler, body=cause.body, changed_fields=changed_fields): yield handler - def iter_event_handlers(self, - resource: resources_.Resource, - event: bodies.Event, - ) -> Iterator[Handler]: + def iter_event_handlers( + self, + resource: resources_.Resource, + event: bodies.Event, + ) -> Iterator[Handler]: for handler in self._handlers: if match(handler=handler, body=event['object']): yield handler - def iter_extra_fields(self, - resource: resources_.Resource, - ) -> Iterator[dicts.FieldPath]: + def iter_extra_fields( + self, + resource: resources_.Resource, + ) -> Iterator[dicts.FieldPath]: for handler in self._handlers: if handler.field: yield handler.field - def requires_finalizer(self, - resource: resources_.Resource, - body: bodies.Body, - ) -> bool: + def requires_finalizer( + self, + resource: resources_.Resource, + body: bodies.Body, + ) -> bool: # check whether the body matches a deletion handler for handler in self._handlers_requiring_finalizer: if match(handler=handler, body=body): @@ -252,20 +262,21 @@ def __init__(self) -> None: self._cause_handlers: MutableMapping[resources_.Resource, SimpleRegistry] = {} self._event_handlers: MutableMapping[resources_.Resource, SimpleRegistry] = {} - def register_cause_handler(self, - group: str, - version: str, - plural: str, - fn: HandlerFn, - id: Optional[str] = None, - event: Optional[str] = None, - field: Optional[dicts.FieldSpec] = None, - timeout: Optional[float] = None, - initial: Optional[bool] = None, - requires_finalizer: bool = False, - labels: Optional[bodies.Labels] = None, - annotations: Optional[bodies.Annotations] = None, - ) -> HandlerFn: + def register_cause_handler( + self, + group: str, + version: str, + plural: str, + fn: HandlerFn, + id: Optional[str] = None, + event: Optional[str] = None, + field: Optional[dicts.FieldSpec] = None, + timeout: Optional[float] = None, + initial: Optional[bool] = None, + requires_finalizer: bool = False, + labels: Optional[bodies.Labels] = None, + annotations: Optional[bodies.Annotations] = None, + ) -> HandlerFn: """ Register an additional handler function for the specific resource and specific event. """ @@ -275,15 +286,16 @@ def register_cause_handler(self, labels=labels, annotations=annotations) return fn # to be usable as a decorator too. - def register_event_handler(self, - group: str, - version: str, - plural: str, - fn: HandlerFn, - id: Optional[str] = None, - labels: Optional[bodies.Labels] = None, - annotations: Optional[bodies.Annotations] = None, - ) -> HandlerFn: + def register_event_handler( + self, + group: str, + version: str, + plural: str, + fn: HandlerFn, + id: Optional[str] = None, + labels: Optional[bodies.Labels] = None, + annotations: Optional[bodies.Annotations] = None, + ) -> HandlerFn: """ Register an additional handler function for low-level events. """ @@ -297,21 +309,24 @@ def resources(self) -> FrozenSet[resources_.Resource]: """ All known resources in the registry. """ return frozenset(self._cause_handlers) | frozenset(self._event_handlers) - def has_cause_handlers(self, - resource: resources_.Resource, - ) -> bool: + def has_cause_handlers( + self, + resource: resources_.Resource, + ) -> bool: resource_registry = self._cause_handlers.get(resource, None) return bool(resource_registry) - def has_event_handlers(self, - resource: resources_.Resource, - ) -> bool: + def has_event_handlers( + self, + resource: resources_.Resource, + ) -> bool: resource_registry = self._event_handlers.get(resource, None) return bool(resource_registry) - def iter_cause_handlers(self, - cause: causation.Cause, - ) -> Iterator[Handler]: + def iter_cause_handlers( + self, + cause: causation.Cause, + ) -> Iterator[Handler]: """ Iterate all handlers that match this cause/event, in the order they were registered (even if mixed). """ @@ -319,10 +334,11 @@ def iter_cause_handlers(self, if resource_registry is not None: yield from resource_registry.iter_cause_handlers(cause=cause) - def iter_event_handlers(self, - resource: resources_.Resource, - event: bodies.Event, - ) -> Iterator[Handler]: + def iter_event_handlers( + self, + resource: resources_.Resource, + event: bodies.Event, + ) -> Iterator[Handler]: """ Iterate all handlers for the low-level events. """ @@ -330,17 +346,19 @@ def iter_event_handlers(self, if resource_registry is not None: yield from resource_registry.iter_event_handlers(resource=resource, event=event) - def iter_extra_fields(self, - resource: resources_.Resource, - ) -> Iterator[dicts.FieldPath]: + def iter_extra_fields( + self, + resource: resources_.Resource, + ) -> Iterator[dicts.FieldPath]: resource_registry = self._cause_handlers.get(resource, None) if resource_registry is not None: yield from resource_registry.iter_extra_fields(resource=resource) - def requires_finalizer(self, - resource: resources_.Resource, - body: bodies.Body, - ) -> bool: + def requires_finalizer( + self, + resource: resources_.Resource, + body: bodies.Body, + ) -> bool: """ Return whether a finalizer should be added to the given resource or not. diff --git a/kopf/toolkits/runner.py b/kopf/toolkits/runner.py index d1df7975..38e3fa64 100644 --- a/kopf/toolkits/runner.py +++ b/kopf/toolkits/runner.py @@ -54,11 +54,13 @@ class KopfRunner(_AbstractKopfRunner): """ _future: ResultFuture - def __init__(self, - *args: Any, - reraise: bool = True, - timeout: Optional[float] = None, - **kwargs: Any): + def __init__( + self, + *args: Any, + reraise: bool = True, + timeout: Optional[float] = None, + **kwargs: Any, + ): super().__init__() self.args = args self.kwargs = kwargs @@ -74,11 +76,12 @@ def __enter__(self) -> "KopfRunner": self._ready.wait() # should be nanosecond-fast return self - def __exit__(self, - exc_type: Optional[Type[BaseException]], - exc_val: Optional[BaseException], - exc_tb: Optional[types.TracebackType], - ) -> Optional[bool]: + def __exit__( + self, + exc_type: Optional[Type[BaseException]], + exc_val: Optional[BaseException], + exc_tb: Optional[types.TracebackType], + ) -> Optional[bool]: # A coroutine that is injected into the loop to cancel everything in it. # Cancellations are caught in `run`, so that it exits gracefully. From be2f19758ec9ee41f74844118345e430e7e7025f Mon Sep 17 00:00:00 2001 From: Sergey Vasilyev Date: Mon, 7 Oct 2019 00:37:18 +0200 Subject: [PATCH 7/8] Fix the logging level documentation (as per PR) --- docs/configuring.rst | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/docs/configuring.rst b/docs/configuring.rst index a8695b74..9b7c9348 100644 --- a/docs/configuring.rst +++ b/docs/configuring.rst @@ -10,21 +10,17 @@ Configure logging events ======================== `kopf.config.EventsConfig` allows to set what types of kopf logs should be -reflected in events. - -Loglevels are: - -* ``kopf.config.LOGLEVEL_INFO`` -* ``kopf.config.LOGLEVEL_WARNING`` -* ``kopf.config.LOGLEVEL_ERROR`` -* ``kopf.config.LOGLEVEL_CRITICAL`` +reflected in events. Use `logging` constants or integer values to set the level: +e.g., `logging.WARNING`, `logging.ERROR`, etc. The default is `logging.INFO`. .. code-block:: python + import logging import kopf # Now kopf will send events only when error or critical occasion happens - kopf.EventsConfig.events_loglevel = config.LOGLEVEL_ERROR + kopf.EventsConfig.events_loglevel = logging.ERROR + Configure Workers ================= From 1e5bca7cbd6f395fea9a77036479e55657eb839d Mon Sep 17 00:00:00 2001 From: Sergey Vasilyev Date: Mon, 7 Oct 2019 00:40:38 +0200 Subject: [PATCH 8/8] Fix the TODO marks in the handler-fn signature (as per PR) --- kopf/reactor/registries.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/kopf/reactor/registries.py b/kopf/reactor/registries.py index 6575e536..2362fea8 100644 --- a/kopf/reactor/registries.py +++ b/kopf/reactor/registries.py @@ -37,7 +37,7 @@ def __call__( self, *args: Any, type: str, - event: Union[str, bodies.Event], # FIXME: or str for cause-handlers. + event: Union[str, bodies.Event], body: bodies.Body, meta: bodies.Meta, spec: bodies.Spec, @@ -47,9 +47,9 @@ def __call__( namespace: Optional[str], patch: patches.Patch, logger: Union[logging.Logger, logging.LoggerAdapter], - diff: diffs.Diff, # TODO:? Optional[diffs.Diff]? - old: bodies.Body, # TODO: or Any for field-handlers. - new: bodies.Body, # TODO: or Any for field-handlers. + diff: diffs.Diff, + old: Optional[Union[bodies.BodyEssence, Any]], # "Any" is for field-handlers. + new: Optional[Union[bodies.BodyEssence, Any]], # "Any" is for field-handlers. **kwargs: Any, ) -> Any: ...