Skip to content

Commit

Permalink
[zalando-incubatorGH-33] implement auto detection mode for peering.
Browse files Browse the repository at this point in the history
  • Loading branch information
s-soroosh committed Apr 22, 2019
1 parent f8cbadf commit fdbcb71
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 11 deletions.
45 changes: 43 additions & 2 deletions kopf/reactor/peering.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@

import iso8601
import kubernetes
from kubernetes.client.rest import ApiException

from kopf.reactor.registry import Resource

Expand All @@ -65,7 +66,7 @@ def __init__(self,
self.id = id
self.peering = peering
self.namespace = namespace
self.priority = (priority)
self.priority = priority
self.lifetime = (lifetime if isinstance(lifetime, datetime.timedelta) else
datetime.timedelta(seconds=int(lifetime)))
self.lastseen = (lastseen if isinstance(lastseen, datetime.datetime) else
Expand Down Expand Up @@ -190,7 +191,7 @@ async def peers_keepalive(

# How often do we update. Keep limited to avoid k8s api flooding.
# Should be slightly less than the lifetime, enough for a patch request to finish.
await asyncio.sleep(max(1, ourselves.lifetime.total_seconds()-10))
await asyncio.sleep(max(1, int(ourselves.lifetime.total_seconds() - 10)))
finally:
try:
ourselves.disappear()
Expand Down Expand Up @@ -227,3 +228,43 @@ def detect_own_id() -> str:
now = datetime.datetime.utcnow().isoformat()
rnd = ''.join(random.choices('abcdefhijklmnopqrstuvwxyz0123456789', k=6))
return f'{user}@{host}/{now}/{rnd}'


class PeerFactory:
@staticmethod
def create_peer(standalone: bool,
peering: Optional[str],
**kwargs) -> Optional[Peer]:
if standalone:
return None

if peering:
if PeerFactory._is_peering_exist(peering):
return Peer(peering=peering, **kwargs)
else:
raise Exception(f"The peering {peering} was not found")

if PeerFactory._is_default_peering_setup():
return Peer(peering=peering, **kwargs)

logger.warning(f"The default peering object not found. Falling back to the Standalone mode...")
return None

# TODO: extend later to accept the namespace and call the get_namespaced_custom_object API if Namespace is availabl
@staticmethod
def _is_default_peering_setup():
return PeerFactory._is_peering_exist(PEERING_DEFAULT_NAME)

@staticmethod
def _is_peering_exist(peering: str):
api = kubernetes.client.CustomObjectsApi()
try:
api.get_cluster_custom_object(group=PEERING_CRD_RESOURCE.group,
version=PEERING_CRD_RESOURCE.version,
plural=PEERING_CRD_RESOURCE.plural,
name=peering)
return True
except ApiException as e:
if e.status == 404:
return False
raise
13 changes: 4 additions & 9 deletions kopf/reactor/queueing.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@

from kopf.reactor.handling import custom_object_handler
from kopf.reactor.lifecycles import get_default_lifecycle
from kopf.reactor.peering import peers_keepalive, peers_handler, Peer, detect_own_id
from kopf.reactor.peering import PEERING_CRD_RESOURCE, PEERING_DEFAULT_NAME
from kopf.reactor.peering import peers_keepalive, peers_handler, Peer, detect_own_id, PeerFactory
from kopf.reactor.registry import get_default_registry, BaseRegistry, Resource
from kopf.reactor.watching import streaming_aiter

Expand Down Expand Up @@ -91,7 +91,7 @@ async def watcher(

# Ensure that the event is something we understand and can handle.
if event['type'] not in ['ADDED', 'MODIFIED', 'DELETED']:
logger.warn("Ignoring an unsupported event type: %r", event)
logger.warning("Ignoring an unsupported event type: %r", event)
continue

# Filter out all unrelated events as soon as possible (before queues), and silently.
Expand Down Expand Up @@ -183,13 +183,8 @@ def create_tasks(
tasks = []

# Monitor the peers, unless explicitly disabled.
if not standalone:
ourselves = Peer(
id=detect_own_id(),
priority=priority,
peering=peering,
namespace=namespace,
)
ourselves: Optional[Peer] = PeerFactory.create_peer(standalone, peering, id=detect_own_id(), priority=priority, namespace=namespace)
if ourselves:
tasks.extend([
asyncio.Task(peers_keepalive(ourselves=ourselves)),
asyncio.Task(watcher(namespace=None, # peering is cluster-object
Expand Down

0 comments on commit fdbcb71

Please sign in to comment.