From fdbcb710630b2a1366fcb94ea2231a4714dbda7b Mon Sep 17 00:00:00 2001 From: soroosh sarabadani Date: Mon, 22 Apr 2019 12:30:04 +0200 Subject: [PATCH 1/7] [GH-33] implement auto detection mode for peering. --- kopf/reactor/peering.py | 45 ++++++++++++++++++++++++++++++++++++++-- kopf/reactor/queueing.py | 13 ++++-------- 2 files changed, 47 insertions(+), 11 deletions(-) diff --git a/kopf/reactor/peering.py b/kopf/reactor/peering.py index 8041c001..79368ea4 100644 --- a/kopf/reactor/peering.py +++ b/kopf/reactor/peering.py @@ -39,6 +39,7 @@ import iso8601 import kubernetes +from kubernetes.client.rest import ApiException from kopf.reactor.registry import Resource @@ -65,7 +66,7 @@ def __init__(self, self.id = id self.peering = peering self.namespace = namespace - self.priority = (priority) + self.priority = priority self.lifetime = (lifetime if isinstance(lifetime, datetime.timedelta) else datetime.timedelta(seconds=int(lifetime))) self.lastseen = (lastseen if isinstance(lastseen, datetime.datetime) else @@ -190,7 +191,7 @@ async def peers_keepalive( # How often do we update. Keep limited to avoid k8s api flooding. # Should be slightly less than the lifetime, enough for a patch request to finish. - await asyncio.sleep(max(1, ourselves.lifetime.total_seconds()-10)) + await asyncio.sleep(max(1, int(ourselves.lifetime.total_seconds() - 10))) finally: try: ourselves.disappear() @@ -227,3 +228,43 @@ def detect_own_id() -> str: now = datetime.datetime.utcnow().isoformat() rnd = ''.join(random.choices('abcdefhijklmnopqrstuvwxyz0123456789', k=6)) return f'{user}@{host}/{now}/{rnd}' + + +class PeerFactory: + @staticmethod + def create_peer(standalone: bool, + peering: Optional[str], + **kwargs) -> Optional[Peer]: + if standalone: + return None + + if peering: + if PeerFactory._is_peering_exist(peering): + return Peer(peering=peering, **kwargs) + else: + raise Exception(f"The peering {peering} was not found") + + if PeerFactory._is_default_peering_setup(): + return Peer(peering=peering, **kwargs) + + logger.warning(f"The default peering object not found. Falling back to the Standalone mode...") + return None + + # TODO: extend later to accept the namespace and call the get_namespaced_custom_object API if Namespace is availabl + @staticmethod + def _is_default_peering_setup(): + return PeerFactory._is_peering_exist(PEERING_DEFAULT_NAME) + + @staticmethod + def _is_peering_exist(peering: str): + api = kubernetes.client.CustomObjectsApi() + try: + api.get_cluster_custom_object(group=PEERING_CRD_RESOURCE.group, + version=PEERING_CRD_RESOURCE.version, + plural=PEERING_CRD_RESOURCE.plural, + name=peering) + return True + except ApiException as e: + if e.status == 404: + return False + raise diff --git a/kopf/reactor/queueing.py b/kopf/reactor/queueing.py index fd9cbe43..abb9dedc 100644 --- a/kopf/reactor/queueing.py +++ b/kopf/reactor/queueing.py @@ -33,8 +33,8 @@ from kopf.reactor.handling import custom_object_handler from kopf.reactor.lifecycles import get_default_lifecycle -from kopf.reactor.peering import peers_keepalive, peers_handler, Peer, detect_own_id from kopf.reactor.peering import PEERING_CRD_RESOURCE, PEERING_DEFAULT_NAME +from kopf.reactor.peering import peers_keepalive, peers_handler, Peer, detect_own_id, PeerFactory from kopf.reactor.registry import get_default_registry, BaseRegistry, Resource from kopf.reactor.watching import streaming_aiter @@ -91,7 +91,7 @@ async def watcher( # Ensure that the event is something we understand and can handle. if event['type'] not in ['ADDED', 'MODIFIED', 'DELETED']: - logger.warn("Ignoring an unsupported event type: %r", event) + logger.warning("Ignoring an unsupported event type: %r", event) continue # Filter out all unrelated events as soon as possible (before queues), and silently. @@ -183,13 +183,8 @@ def create_tasks( tasks = [] # Monitor the peers, unless explicitly disabled. - if not standalone: - ourselves = Peer( - id=detect_own_id(), - priority=priority, - peering=peering, - namespace=namespace, - ) + ourselves: Optional[Peer] = PeerFactory.create_peer(standalone, peering, id=detect_own_id(), priority=priority, namespace=namespace) + if ourselves: tasks.extend([ asyncio.Task(peers_keepalive(ourselves=ourselves)), asyncio.Task(watcher(namespace=None, # peering is cluster-object From 9352306c9581217fde457e3e4c40d7844e18dfff Mon Sep 17 00:00:00 2001 From: soroosh sarabadani Date: Mon, 22 Apr 2019 12:36:52 +0200 Subject: [PATCH 2/7] [GH-33] switch to auto-mode if --peering is not set. --- kopf/cli.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/kopf/cli.py b/kopf/cli.py index 8f2c4b4e..8651bc78 100644 --- a/kopf/cli.py +++ b/kopf/cli.py @@ -32,7 +32,7 @@ def main(): @click.option('-n', '--namespace', default=None) @click.option('--standalone', is_flag=True, default=False) @click.option('--dev', 'priority', flag_value=666) -@click.option('-P', '--peering', type=str, default=PEERING_DEFAULT_NAME) +@click.option('-P', '--peering', type=str, default=None) @click.option('-p', '--priority', type=int, default=0) @click.option('-m', '--module', 'modules', multiple=True) @click.argument('paths', nargs=-1) @@ -87,3 +87,6 @@ def resume(id, namespace, peering): namespace=namespace, ) ourselves.disappear() + + +main() From 5878d5b3eb1ca8ecdf4099cb9c1305794913d60e Mon Sep 17 00:00:00 2001 From: soroosh sarabadani Date: Mon, 22 Apr 2019 15:05:22 +0200 Subject: [PATCH 3/7] [GH-33] revert main() class in cli.py --- kopf/cli.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/kopf/cli.py b/kopf/cli.py index 8651bc78..5f64cb7c 100644 --- a/kopf/cli.py +++ b/kopf/cli.py @@ -87,6 +87,3 @@ def resume(id, namespace, peering): namespace=namespace, ) ourselves.disappear() - - -main() From 204230c83068fcf9a16600772579dd0d619efa75 Mon Sep 17 00:00:00 2001 From: soroosh sarabadani Date: Mon, 22 Apr 2019 15:09:06 +0200 Subject: [PATCH 4/7] [GH-33] introduce Peer.detect method. --- kopf/reactor/peering.py | 78 ++++++++++++++++++++-------------------- kopf/reactor/queueing.py | 4 +-- 2 files changed, 40 insertions(+), 42 deletions(-) diff --git a/kopf/reactor/peering.py b/kopf/reactor/peering.py index 79368ea4..9ce2d441 100644 --- a/kopf/reactor/peering.py +++ b/kopf/reactor/peering.py @@ -79,6 +79,26 @@ def __init__(self, def __repr__(self): return f"{self.__class__.__name__}({self.id}, namespace={self.namespace}, priority={self.priority}, lastseen={self.lastseen}, lifetime={self.lifetime})" + @classmethod + def detect(cls, + standalone: bool, + peering: Optional[str], + **kwargs) -> Optional: + if standalone: + return None + + if peering: + if Peer._is_peering_exist(peering): + return cls(peering=peering, **kwargs) + else: + raise Exception(f"The peering {peering} was not found") + + if Peer._is_default_peering_setup(): + return cls(peering=peering, **kwargs) + + logger.warning(f"The default peering object not found. Falling back to the Standalone mode...") + return None + def as_dict(self): # Only the non-calculated and non-identifying fields. return { @@ -110,6 +130,24 @@ def disappear(self): self.touch(lifetime=0) apply_peers([self], peering=self.peering) + @staticmethod + def _is_default_peering_setup(): + return Peer._is_peering_exist(PEERING_DEFAULT_NAME) + + @staticmethod + def _is_peering_exist(peering: str): + api = kubernetes.client.CustomObjectsApi() + try: + api.get_cluster_custom_object(group=PEERING_CRD_RESOURCE.group, + version=PEERING_CRD_RESOURCE.version, + plural=PEERING_CRD_RESOURCE.plural, + name=peering) + return True + except ApiException as e: + if e.status == 404: + return False + raise + def apply_peers( peers: Iterable[Peer], @@ -228,43 +266,3 @@ def detect_own_id() -> str: now = datetime.datetime.utcnow().isoformat() rnd = ''.join(random.choices('abcdefhijklmnopqrstuvwxyz0123456789', k=6)) return f'{user}@{host}/{now}/{rnd}' - - -class PeerFactory: - @staticmethod - def create_peer(standalone: bool, - peering: Optional[str], - **kwargs) -> Optional[Peer]: - if standalone: - return None - - if peering: - if PeerFactory._is_peering_exist(peering): - return Peer(peering=peering, **kwargs) - else: - raise Exception(f"The peering {peering} was not found") - - if PeerFactory._is_default_peering_setup(): - return Peer(peering=peering, **kwargs) - - logger.warning(f"The default peering object not found. Falling back to the Standalone mode...") - return None - - # TODO: extend later to accept the namespace and call the get_namespaced_custom_object API if Namespace is availabl - @staticmethod - def _is_default_peering_setup(): - return PeerFactory._is_peering_exist(PEERING_DEFAULT_NAME) - - @staticmethod - def _is_peering_exist(peering: str): - api = kubernetes.client.CustomObjectsApi() - try: - api.get_cluster_custom_object(group=PEERING_CRD_RESOURCE.group, - version=PEERING_CRD_RESOURCE.version, - plural=PEERING_CRD_RESOURCE.plural, - name=peering) - return True - except ApiException as e: - if e.status == 404: - return False - raise diff --git a/kopf/reactor/queueing.py b/kopf/reactor/queueing.py index abb9dedc..ddec3def 100644 --- a/kopf/reactor/queueing.py +++ b/kopf/reactor/queueing.py @@ -34,7 +34,7 @@ from kopf.reactor.handling import custom_object_handler from kopf.reactor.lifecycles import get_default_lifecycle from kopf.reactor.peering import PEERING_CRD_RESOURCE, PEERING_DEFAULT_NAME -from kopf.reactor.peering import peers_keepalive, peers_handler, Peer, detect_own_id, PeerFactory +from kopf.reactor.peering import peers_keepalive, peers_handler, Peer, detect_own_id from kopf.reactor.registry import get_default_registry, BaseRegistry, Resource from kopf.reactor.watching import streaming_aiter @@ -183,7 +183,7 @@ def create_tasks( tasks = [] # Monitor the peers, unless explicitly disabled. - ourselves: Optional[Peer] = PeerFactory.create_peer(standalone, peering, id=detect_own_id(), priority=priority, namespace=namespace) + ourselves: Optional[Peer] = Peer.detect(standalone, peering, id=detect_own_id(), priority=priority, namespace=namespace) if ourselves: tasks.extend([ asyncio.Task(peers_keepalive(ourselves=ourselves)), From 18924dbdb4e40e28c6cb55b734c7deb0ae423e68 Mon Sep 17 00:00:00 2001 From: Sergey Vasilyev Date: Mon, 22 Apr 2019 22:04:58 +0200 Subject: [PATCH 5/7] GH-33 fix issue with peering=None Co-Authored-By: psycho-ir --- kopf/reactor/peering.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kopf/reactor/peering.py b/kopf/reactor/peering.py index 9ce2d441..5c604ad8 100644 --- a/kopf/reactor/peering.py +++ b/kopf/reactor/peering.py @@ -94,7 +94,7 @@ def detect(cls, raise Exception(f"The peering {peering} was not found") if Peer._is_default_peering_setup(): - return cls(peering=peering, **kwargs) + return cls(peering=PEERING_DEFAULT_NAME, **kwargs) logger.warning(f"The default peering object not found. Falling back to the Standalone mode...") return None From d4e23c19224e5fac6fa936929384132fdc0b37c6 Mon Sep 17 00:00:00 2001 From: Soroosh Sarabadani Date: Tue, 23 Apr 2019 13:01:52 +0200 Subject: [PATCH 6/7] GH-33 sync peering doc with implementation --- docs/peering.rst | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/docs/peering.rst b/docs/peering.rst index 880a5abc..46504c33 100644 --- a/docs/peering.rst +++ b/docs/peering.rst @@ -11,9 +11,10 @@ The operator can be instructed to use alternative peering objects:: The operators from different peering objects do not see each other. -The default peering name (i.e. if no peering or standalone options are provided) -is ``default``. +Default behavior +---------------- +If there is a peering object with name `default` then it's been used by default as the peering object. Otherwise kopf will run the operator in mode `Standalone`. Standalone mode --------------- From bd4a4fcfc2575b38d84797e2a0c27af2ae52a35e Mon Sep 17 00:00:00 2001 From: Soroosh Sarabadani Date: Wed, 24 Apr 2019 14:00:23 +0200 Subject: [PATCH 7/7] Update kopf/reactor/peering.py --- kopf/reactor/peering.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kopf/reactor/peering.py b/kopf/reactor/peering.py index 5c604ad8..e5d146c1 100644 --- a/kopf/reactor/peering.py +++ b/kopf/reactor/peering.py @@ -96,7 +96,7 @@ def detect(cls, if Peer._is_default_peering_setup(): return cls(peering=PEERING_DEFAULT_NAME, **kwargs) - logger.warning(f"The default peering object not found. Falling back to the Standalone mode...") + logger.warning(f"Default peering object not found, falling back to the Standalone mode.") return None def as_dict(self):