Skip to content
This repository has been archived by the owner on Sep 14, 2020. It is now read-only.

Commit

Permalink
Merge pull request #195 from nolar/typehints-resources
Browse files Browse the repository at this point in the history
Refactor resources & registries & filters — for type-hinting
  • Loading branch information
nolar authored Oct 5, 2019
2 parents 90b4f23 + 33d2d4a commit d848601
Show file tree
Hide file tree
Showing 17 changed files with 110 additions and 105 deletions.
6 changes: 3 additions & 3 deletions kopf/clients/watching.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
from kopf.clients import auth
from kopf.clients import classes
from kopf.clients import fetching
from kopf.reactor import registries
from kopf.structs import resources

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -69,7 +69,7 @@ async def streaming_aiter(src, loop=None, executor=None):


async def infinite_watch(
resource: registries.Resource,
resource: resources.Resource,
namespace: Union[None, str],
):
"""
Expand All @@ -89,7 +89,7 @@ async def infinite_watch(


async def streaming_watch(
resource: registries.Resource,
resource: resources.Resource,
namespace: Union[None, str],
):
"""
Expand Down
9 changes: 5 additions & 4 deletions kopf/engines/peering.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,14 @@

from kopf.clients import fetching
from kopf.clients import patching
from kopf.reactor import registries
from kopf.structs import resources

logger = logging.getLogger(__name__)

# The CRD info on the special sync-object.
CLUSTER_PEERING_RESOURCE = registries.Resource('zalando.org', 'v1', 'clusterkopfpeerings')
NAMESPACED_PEERING_RESOURCE = registries.Resource('zalando.org', 'v1', 'kopfpeerings')
LEGACY_PEERING_RESOURCE = registries.Resource('zalando.org', 'v1', 'kopfpeerings')
CLUSTER_PEERING_RESOURCE = resources.Resource('zalando.org', 'v1', 'clusterkopfpeerings')
NAMESPACED_PEERING_RESOURCE = resources.Resource('zalando.org', 'v1', 'kopfpeerings')
LEGACY_PEERING_RESOURCE = resources.Resource('zalando.org', 'v1', 'kopfpeerings')
PEERING_DEFAULT_NAME = 'default'


Expand Down
4 changes: 2 additions & 2 deletions kopf/reactor/causation.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@
import logging
from typing import NamedTuple, Text, Mapping, MutableMapping, Optional, Any, Union

from kopf.reactor import registries
from kopf.structs import diffs
from kopf.structs import finalizers
from kopf.structs import lastseen
from kopf.structs import resources

# Constants for event types, to prevent a direct usage of strings, and typos.
# They are not exposed by the framework, but are used internally. See also: `kopf.on`.
Expand Down Expand Up @@ -62,7 +62,7 @@ class Cause(NamedTuple):
of actual field changes, including multi-handler changes.
"""
logger: Union[logging.Logger, logging.LoggerAdapter]
resource: registries.Resource
resource: resources.Resource
event: Text
initial: bool
body: MutableMapping
Expand Down
41 changes: 21 additions & 20 deletions kopf/reactor/handling.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,12 @@
from kopf.reactor import causation
from kopf.reactor import invocation
from kopf.reactor import registries
from kopf.reactor import state
from kopf.structs import dicts
from kopf.structs import diffs
from kopf.structs import finalizers
from kopf.structs import lastseen
from kopf.structs import status
from kopf.structs import resources

WAITING_KEEPALIVE_INTERVAL = 10 * 60
""" How often to wake up from the long sleep, to show the liveliness. """
Expand Down Expand Up @@ -71,7 +72,7 @@ class HandlerChildrenRetry(TemporaryError):
async def custom_object_handler(
lifecycle: Callable,
registry: registries.GlobalRegistry,
resource: registries.Resource,
resource: resources.Resource,
event: dict,
freeze: asyncio.Event,
replenished: asyncio.Event,
Expand Down Expand Up @@ -142,7 +143,7 @@ async def custom_object_handler(

async def handle_event(
registry: registries.BaseRegistry,
resource: registries.Resource,
resource: resources.Resource,
logger: logging_engine.ObjectLogger,
patch: dict,
event: dict,
Expand Down Expand Up @@ -177,7 +178,7 @@ async def handle_event(

else:
logger.info(f"Handler {handler.id!r} succeeded.", local=True)
status.store_result(patch=patch, handler=handler, result=result)
state.store_result(patch=patch, handler=handler, result=result)


async def handle_cause(
Expand Down Expand Up @@ -225,7 +226,7 @@ async def handle_cause(
extra_fields = registry.get_extra_fields(resource=cause.resource)
lastseen.refresh_state(body=body, patch=patch, extra_fields=extra_fields)
if done:
status.purge_progress(body=body, patch=patch)
state.purge_progress(body=body, patch=patch)
if cause.event == causation.DELETE:
logger.debug("Removing the finalizer, thus allowing the actual deletion.")
finalizers.remove_finalizers(body=body, patch=patch)
Expand Down Expand Up @@ -356,23 +357,23 @@ async def _execute(
logger = cause.logger

# Filter and select the handlers to be executed right now, on this event reaction cycle.
handlers_done = [h for h in handlers if status.is_finished(body=cause.body, handler=h)]
handlers_wait = [h for h in handlers if status.is_sleeping(body=cause.body, handler=h)]
handlers_todo = [h for h in handlers if status.is_awakened(body=cause.body, handler=h)]
handlers_done = [h for h in handlers if state.is_finished(body=cause.body, handler=h)]
handlers_wait = [h for h in handlers if state.is_sleeping(body=cause.body, handler=h)]
handlers_todo = [h for h in handlers if state.is_awakened(body=cause.body, handler=h)]
handlers_plan = [h for h in await invocation.invoke(lifecycle, handlers_todo, cause=cause)]
handlers_left = [h for h in handlers_todo if h.id not in {h.id for h in handlers_plan}]

# Set the timestamps -- even if not executed on this event, but just got registered.
for handler in handlers:
if not status.is_started(body=cause.body, handler=handler):
status.set_start_time(body=cause.body, patch=cause.patch, handler=handler)
if not state.is_started(body=cause.body, handler=handler):
state.set_start_time(body=cause.body, patch=cause.patch, handler=handler)

# Execute all planned (selected) handlers in one event reaction cycle, even if there are few.
for handler in handlers_plan:

# Restore the handler's progress status. It can be useful in the handlers.
retry = status.get_retry_count(body=cause.body, handler=handler)
started = status.get_start_time(body=cause.body, handler=handler, patch=cause.patch)
retry = state.get_retry_count(body=cause.body, handler=handler)
started = state.get_start_time(body=cause.body, handler=handler, patch=cause.patch)
runtime = datetime.datetime.utcnow() - started

# The exceptions are handled locally and are not re-raised, to keep the operator running.
Expand All @@ -394,42 +395,42 @@ async def _execute(
# Unfinished children cause the regular retry, but with less logging and event reporting.
except HandlerChildrenRetry as e:
logger.debug(f"Handler {handler.id!r} has unfinished sub-handlers. Will retry soon.")
status.set_retry_time(body=cause.body, patch=cause.patch, handler=handler, delay=e.delay)
state.set_retry_time(body=cause.body, patch=cause.patch, handler=handler, delay=e.delay)
handlers_left.append(handler)

# Definitely a temporary error, regardless of the error strictness.
except TemporaryError as e:
logger.error(f"Handler {handler.id!r} failed temporarily: %s", str(e) or repr(e))
status.set_retry_time(body=cause.body, patch=cause.patch, handler=handler, delay=e.delay)
state.set_retry_time(body=cause.body, patch=cause.patch, handler=handler, delay=e.delay)
handlers_left.append(handler)

# Same as permanent errors below, but with better logging for our internal cases.
except HandlerTimeoutError as e:
logger.error(f"%s", str(e) or repr(e)) # already formatted
status.store_failure(body=cause.body, patch=cause.patch, handler=handler, exc=e)
state.store_failure(body=cause.body, patch=cause.patch, handler=handler, exc=e)
# TODO: report the handling failure somehow (beside logs/events). persistent status?

# Definitely a permanent error, regardless of the error strictness.
except PermanentError as e:
logger.error(f"Handler {handler.id!r} failed permanently: %s", str(e) or repr(e))
status.store_failure(body=cause.body, patch=cause.patch, handler=handler, exc=e)
state.store_failure(body=cause.body, patch=cause.patch, handler=handler, exc=e)
# TODO: report the handling failure somehow (beside logs/events). persistent status?

# Regular errors behave as either temporary or permanent depending on the error strictness.
except Exception as e:
if retry_on_errors:
logger.exception(f"Handler {handler.id!r} failed with an exception. Will retry.")
status.set_retry_time(body=cause.body, patch=cause.patch, handler=handler, delay=DEFAULT_RETRY_DELAY)
state.set_retry_time(body=cause.body, patch=cause.patch, handler=handler, delay=DEFAULT_RETRY_DELAY)
handlers_left.append(handler)
else:
logger.exception(f"Handler {handler.id!r} failed with an exception. Will stop.")
status.store_failure(body=cause.body, patch=cause.patch, handler=handler, exc=e)
state.store_failure(body=cause.body, patch=cause.patch, handler=handler, exc=e)
# TODO: report the handling failure somehow (beside logs/events). persistent status?

# No errors means the handler should be excluded from future runs in this reaction cycle.
else:
logger.info(f"Handler {handler.id!r} succeeded.")
status.store_success(body=cause.body, patch=cause.patch, handler=handler, result=result)
state.store_success(body=cause.body, patch=cause.patch, handler=handler, result=result)

# Provoke the retry of the handling cycle if there were any unfinished handlers,
# either because they were not selected by the lifecycle, or failed and need a retry.
Expand All @@ -440,7 +441,7 @@ async def _execute(
# Other (non-delayed) handlers will continue as normlally, due to raise few lines above.
# Other objects will continue as normally in their own handling asyncio tasks.
if handlers_wait:
times = [status.get_awake_time(body=cause.body, handler=handler) for handler in handlers_wait]
times = [state.get_awake_time(body=cause.body, handler=handler) for handler in handlers_wait]
until = min(times) # the soonest awake datetime.
delay = (until - datetime.datetime.utcnow()).total_seconds()
delay = max(0, min(WAITING_KEEPALIVE_INTERVAL, delay))
Expand Down
4 changes: 2 additions & 2 deletions kopf/reactor/lifecycles.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import logging
import random

from kopf.structs import status
from kopf.reactor import state

logger = logging.getLogger(__name__)

Expand All @@ -39,7 +39,7 @@ def shuffled(handlers, **kwargs):

def asap(handlers, *, body, **kwargs):
""" Execute one handler at a time, skip on failure, try the next one, retry after the full cycle. """
retryfn = lambda handler: status.get_retry_count(body=body, handler=handler)
retryfn = lambda handler: state.get_retry_count(body=body, handler=handler)
return sorted(handlers, key=retryfn)[:1]


Expand Down
6 changes: 3 additions & 3 deletions kopf/reactor/queueing.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@

from kopf import config
from kopf.clients import watching
from kopf.reactor import registries
from kopf.structs import resources

logger = logging.getLogger(__name__)

Expand All @@ -44,7 +44,7 @@ class Stream(NamedTuple):


ObjectUid = NewType('ObjectUid', str)
ObjectRef = Tuple[registries.Resource, ObjectUid]
ObjectRef = Tuple[resources.Resource, ObjectUid]
Streams = MutableMapping[ObjectRef, Stream]

EOS = object()
Expand All @@ -54,7 +54,7 @@ class Stream(NamedTuple):
# TODO: add the label_selector support for the dev-mode?
async def watcher(
namespace: Union[None, str],
resource: registries.Resource,
resource: resources.Resource,
handler: Callable,
):
"""
Expand Down
67 changes: 43 additions & 24 deletions kopf/reactor/registries.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,7 @@
from types import FunctionType, MethodType
from typing import MutableMapping, NamedTuple, Text, Optional, Tuple, Callable, Mapping

from kopf.structs import filters


# An immutable reference to a custom resource definition.
class Resource(NamedTuple):
group: Text
version: Text
plural: Text

@property
def name(self):
return f'{self.plural}.{self.group}'

@property
def api_version(self):
# Strip heading/trailing slashes if group is absent (e.g. for pods).
return f'{self.group}/{self.version}'.strip('/')
from kopf.structs import resources as resources_


# A registered handler (function + event meta info).
Expand Down Expand Up @@ -151,12 +135,12 @@ def iter_cause_handlers(self, cause):
if handler.event is None or handler.event == cause.event:
if handler.initial and not cause.initial:
pass # ignore initial handlers in non-initial causes.
elif filters.match(handler=handler, body=cause.body, changed_fields=changed_fields):
elif match(handler=handler, body=cause.body, changed_fields=changed_fields):
yield handler

def iter_event_handlers(self, resource, event):
for handler in self._handlers:
if filters.match(handler=handler, body=event['object']):
if match(handler=handler, body=event['object']):
yield handler

def iter_extra_fields(self, resource):
Expand All @@ -167,7 +151,7 @@ def iter_extra_fields(self, resource):
def requires_finalizer(self, resource, body):
# check whether the body matches a deletion handler
for handler in self._handlers_requiring_finalizer:
if filters.match(handler=handler, body=body):
if match(handler=handler, body=body):
return True

return False
Expand Down Expand Up @@ -201,16 +185,16 @@ class GlobalRegistry(BaseRegistry):

def __init__(self):
super().__init__()
self._cause_handlers: MutableMapping[Resource, SimpleRegistry] = {}
self._event_handlers: MutableMapping[Resource, SimpleRegistry] = {}
self._cause_handlers: MutableMapping[resources_.Resource, SimpleRegistry] = {}
self._event_handlers: MutableMapping[resources_.Resource, SimpleRegistry] = {}

def register_cause_handler(self, group, version, plural, fn,
id=None, event=None, field=None, timeout=None, initial=None, requires_finalizer=False,
labels=None, annotations=None):
"""
Register an additional handler function for the specific resource and specific event.
"""
resource = Resource(group, version, plural)
resource = resources_.Resource(group, version, plural)
registry = self._cause_handlers.setdefault(resource, SimpleRegistry())
registry.register(event=event, field=field, fn=fn, id=id, timeout=timeout, initial=initial, requires_finalizer=requires_finalizer,
labels=labels, annotations=annotations)
Expand All @@ -221,7 +205,7 @@ def register_event_handler(self, group, version, plural, fn, id=None, labels=Non
"""
Register an additional handler function for low-level events.
"""
resource = Resource(group, version, plural)
resource = resources_.Resource(group, version, plural)
registry = self._event_handlers.setdefault(resource, SimpleRegistry())
registry.register(fn=fn, id=id, labels=labels, annotations=annotations)
return fn # to be usable as a decorator too.
Expand Down Expand Up @@ -289,3 +273,38 @@ def set_default_registry(registry: GlobalRegistry):
"""
global _default_registry
_default_registry = registry


def match(handler, body, changed_fields=None):
return (
(not handler.field or _matches_field(handler, changed_fields or [])) and
(not handler.labels or _matches_labels(handler, body)) and
(not handler.annotations or _matches_annotations(handler, body))
)


def _matches_field(handler, changed_fields):
return any(field[:len(handler.field)] == handler.field for field in changed_fields)


def _matches_labels(handler, body):
return _matches_metadata(handler=handler, body=body, metadata_type='labels')


def _matches_annotations(handler, body):
return _matches_metadata(handler=handler, body=body, metadata_type='annotations')


def _matches_metadata(handler, body, metadata_type):
metadata = getattr(handler, metadata_type)
object_metadata = body.get('metadata', {}).get(metadata_type, {})

for key, value in metadata.items():
if key not in object_metadata:
return False
elif value is not None and value != object_metadata[key]:
return False
else:
continue

return True
File renamed without changes.
Loading

0 comments on commit d848601

Please sign in to comment.