From 5b465fc42e6503c9cfc005dd13e077f69645790c Mon Sep 17 00:00:00 2001 From: hugo Date: Mon, 9 Sep 2024 11:02:29 +0200 Subject: [PATCH] Revert "Integrate scheduler with server (Issue #8009, PR #8050)" This reverts commit 22eab725f649b81ad01937fc577a31571d0366fa. --- .../unreleased/8009-server-scheduler.yml | 4 - mypy-baseline.txt | 9 - src/inmanta/agent/agent.py | 7 - src/inmanta/agent/agent_new.py | 265 -------------- src/inmanta/app.py | 25 -- src/inmanta/const.py | 4 - src/inmanta/deploy/scheduler.py | 78 +---- src/inmanta/deploy/work.py | 2 +- src/inmanta/protocol/methods.py | 17 - src/inmanta/server/agentmanager.py | 124 +------ src/inmanta/server/config.py | 4 - .../server/services/compilerservice.py | 1 - src/inmanta/server/services/dryrunservice.py | 22 +- .../server/services/environmentservice.py | 71 ++-- .../server/services/orchestrationservice.py | 54 +-- .../server/services/resourceservice.py | 7 +- tests/deploy/test_scheduler.py | 328 ------------------ tests/test_project.py | 13 +- 18 files changed, 92 insertions(+), 943 deletions(-) delete mode 100644 changelogs/unreleased/8009-server-scheduler.yml delete mode 100644 src/inmanta/agent/agent_new.py delete mode 100644 tests/deploy/test_scheduler.py diff --git a/changelogs/unreleased/8009-server-scheduler.yml b/changelogs/unreleased/8009-server-scheduler.yml deleted file mode 100644 index d8cc5dd622..0000000000 --- a/changelogs/unreleased/8009-server-scheduler.yml +++ /dev/null @@ -1,4 +0,0 @@ -description: Integrate scheduler with server -issue-nr: 8009 -change-type: minor -destination-branches: [master] diff --git a/mypy-baseline.txt b/mypy-baseline.txt index b86aeb1720..f8211efed4 100644 --- a/mypy-baseline.txt +++ b/mypy-baseline.txt @@ -373,7 +373,6 @@ src/inmanta/protocol/methods.py:0: error: Function is missing a return type anno src/inmanta/protocol/methods.py:0: note: Use "-> None" if function does not return a value src/inmanta/protocol/methods.py:0: error: Function is missing a return type annotation [no-untyped-def] src/inmanta/protocol/methods.py:0: error: Function is missing a return type annotation [no-untyped-def] -src/inmanta/protocol/methods.py:0: error: Missing return statement [empty-body] src/inmanta/protocol/methods.py:0: error: Function is missing a return type annotation [no-untyped-def] src/inmanta/protocol/methods.py:0: error: Function is missing a type annotation for one or more arguments [no-untyped-def] src/inmanta/protocol/methods.py:0: error: Function is missing a return type annotation [no-untyped-def] @@ -969,7 +968,6 @@ src/inmanta/app.py:0: error: Missing type parameters for generic type "Future" src/inmanta/app.py:0: error: Argument 2 to "safe_shutdown" has incompatible type "Callable[[int | None], Coroutine[Any, Any, None]]"; expected "Callable[[], None]" [arg-type] src/inmanta/app.py:0: error: Argument 2 to "safe_shutdown" has incompatible type "Callable[[int | None], Coroutine[Any, Any, None]]"; expected "Callable[[], None]" [arg-type] src/inmanta/app.py:0: error: Incompatible types in assignment (expression has type "str | None", variable has type "int | None") [assignment] -src/inmanta/app.py:0: error: Incompatible types in assignment (expression has type "str | None", variable has type "int | None") [assignment] src/inmanta/app.py:0: error: Argument 1 to "run" of "Exporter" has incompatible type "dict[str, Type]"; expected "dict[str, Entity] | None" [arg-type] src/inmanta/app.py:0: error: No overload variant of "print" matches argument types "str", "TextIO | Any", "dict[str, object]" [call-overload] src/inmanta/app.py:0: note: Possible overload variants: @@ -990,16 +988,9 @@ src/inmanta/compiler/help/explainer.py:0: error: Item "None" of "RelationAttribu src/inmanta/agent/resourcepool.py:0: error: Missing type parameters for generic type "PoolMember" [type-arg] src/inmanta/agent/resourcepool.py:0: error: Argument 1 to "append" of "list" has incompatible type "Callable[[TPoolMember], Coroutine[Any, Any, bool]]"; expected "Callable[[PoolMember[TPoolID]], Coroutine[Any, Any, Any]]" [arg-type] src/inmanta/agent/resourcepool.py:0: error: Missing type parameters for generic type "PoolMember" [type-arg] -src/inmanta/deploy/scheduler.py:0: error: Argument "attribute_hash" to "ResourceDetails" has incompatible type "str | None"; expected "str" [arg-type] -src/inmanta/deploy/scheduler.py:0: error: No overload variant of "list" matches argument type "object" [call-overload] -src/inmanta/deploy/scheduler.py:0: note: Possible overload variants: -src/inmanta/deploy/scheduler.py:0: note: def [_T] __init__(self) -> list[_T] -src/inmanta/deploy/scheduler.py:0: note: def [_T] __init__(self, Iterable[_T], /) -> list[_T] src/inmanta/agent/forking_executor.py:0: error: Argument 1 of "connection_made" is incompatible with supertype "BaseProtocol"; supertype defines the argument type as "BaseTransport" [override] src/inmanta/agent/forking_executor.py:0: note: This violates the Liskov substitution principle src/inmanta/agent/forking_executor.py:0: note: See https://mypy.readthedocs.io/en/stable/common_issues.html#incompatible-overrides -src/inmanta/agent/agent_new.py:0: error: Argument 3 to "MPManager" has incompatible type "UUID | None"; expected "UUID" [arg-type] -src/inmanta/agent/agent_new.py:0: error: Argument 1 to "collect_report" has incompatible type "inmanta.agent.agent_new.Agent"; expected "inmanta.agent.agent.Agent" [arg-type] src/inmanta/agent/agent.py:0: error: Cannot determine type of "ratelimiter" [has-type] src/inmanta/validation_type.py:0: error: Incompatible types in assignment (expression has type "object", variable has type "Mapping[str, object] | None") [assignment] src/inmanta/server/validate_filter.py:0: error: Incompatible return value type (got "Callable[[object], list[tuple[RangeOperator, int]] | None]", expected "Callable[[object, object], list[tuple[RangeOperator, int]] | None]") [return-value] diff --git a/src/inmanta/agent/agent.py b/src/inmanta/agent/agent.py index 3bf62478d7..716b62ceac 100644 --- a/src/inmanta/agent/agent.py +++ b/src/inmanta/agent/agent.py @@ -1330,13 +1330,6 @@ async def trigger_update(self, env: uuid.UUID, agent: str, incremental_deploy: b ) return 200 - @protocol.handle(methods.trigger_read_version, env="tid") - async def read_version(self, env: uuid.UUID) -> Apireturn: - """ - Send a notification to the agent that a new version has been released - """ - pass - @protocol.handle(methods.resource_event, env="tid", agent="id") async def resource_event( self, diff --git a/src/inmanta/agent/agent_new.py b/src/inmanta/agent/agent_new.py deleted file mode 100644 index ee66e30660..0000000000 --- a/src/inmanta/agent/agent_new.py +++ /dev/null @@ -1,265 +0,0 @@ -""" - Copyright 2024 Inmanta - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - - Contact: code@inmanta.com -""" - -import asyncio -import logging -import os -import uuid -from concurrent.futures.thread import ThreadPoolExecutor -from typing import Any, Optional - -from inmanta import config, const, protocol -from inmanta.agent import config as cfg -from inmanta.agent import executor, forking_executor -from inmanta.agent.reporting import collect_report -from inmanta.const import AGENT_SCHEDULER_ID -from inmanta.data.model import AttributeStateChange, ResourceVersionIdStr -from inmanta.deploy.scheduler import ResourceScheduler -from inmanta.protocol import SessionEndpoint, methods, methods_v2 -from inmanta.types import Apireturn -from inmanta.util import join_threadpools - -LOGGER = logging.getLogger(__name__) - - -class Agent(SessionEndpoint): - """ - An agent to enact changes upon resources. This agent listens to the - message bus for changes. - """ - - # cache reference to THIS ioloop for handlers to push requests on it - # defer to start, just to be sure - _io_loop: asyncio.AbstractEventLoop - - def __init__( - self, - environment: Optional[uuid.UUID] = None, - ): - """ - :param environment: environment id - """ - super().__init__("agent", timeout=cfg.server_timeout.get(), reconnect_delay=cfg.agent_reconnect_delay.get()) - - self.thread_pool = ThreadPoolExecutor(1, thread_name_prefix="mainpool") - self._storage = self.check_storage() - - if environment is None: - environment = cfg.environment.get() - if environment is None: - raise Exception("The agent requires an environment to be set.") - self.set_environment(environment) - assert self.environment is not None - - self.executor_manager: executor.ExecutorManager[executor.Executor] = self.create_executor_manager() - self.scheduler = ResourceScheduler(self.environment) - self.working = False - - def create_executor_manager(self) -> executor.ExecutorManager[executor.Executor]: - # To override in testing - return forking_executor.MPManager( - self.thread_pool, - self.sessionid, - self.environment, - config.log_dir.get(), - self._storage["executor"], - LOGGER.level, - False, - ) - - async def stop(self) -> None: - await super().stop() - - if self.working: - await self.stop_working() - - threadpools_to_join = [self.thread_pool] - - await self.executor_manager.join(threadpools_to_join, const.SHUTDOWN_GRACE_IOLOOP * 0.9) - - self.thread_pool.shutdown(wait=False) - - await join_threadpools(threadpools_to_join) - - async def start_connected(self) -> None: - """ - This method is required because: - 1) The client transport is required to retrieve the autostart_agent_map from the server. - 2) _init_endpoint_names() needs to be an async method and async calls are not possible in a constructor. - """ - await self.add_end_point_name(AGENT_SCHEDULER_ID) - - async def start(self) -> None: - # cache reference to THIS ioloop for handlers to push requests on it - self._io_loop = asyncio.get_running_loop() - await super().start() - - async def start_working(self) -> None: - """Start working, once we have a session""" - # Todo: recycle them when we restart - if self.working: - return - self.working = True - await self.scheduler.start() - await self.executor_manager.start() - - async def stop_working(self) -> None: - """Stop working""" - if not self.working: - return - # Todo: recycle them when we restart - self.working = False - await self.executor_manager.stop() - await self.scheduler.stop() - - @protocol.handle(methods_v2.update_agent_map) - async def update_agent_map(self, agent_map: dict[str, str]) -> None: - # Not used here - pass - - async def unpause(self, name: str) -> Apireturn: - if not name == AGENT_SCHEDULER_ID: - return 404, "No such agent" - - await self.start_working() - return 200 - - async def pause(self, name: str) -> Apireturn: - if not name == AGENT_SCHEDULER_ID: - return 404, "No such agent" - - await self.stop_working() - return 200 - - @protocol.handle(methods.set_state) - async def set_state(self, agent: str, enabled: bool) -> Apireturn: - if enabled: - return await self.unpause(agent) - else: - return await self.pause(agent) - - async def on_reconnect(self) -> None: - name = AGENT_SCHEDULER_ID - result = await self._client.get_state(tid=self._env_id, sid=self.sessionid, agent=name) - if result.code == 200 and result.result is not None: - state = result.result - if "enabled" in state and isinstance(state["enabled"], bool): - await self.set_state(name, state["enabled"]) - else: - LOGGER.warning("Server reported invalid state %s" % (repr(state))) - else: - LOGGER.warning("could not get state from the server") - - async def on_disconnect(self) -> None: - await self.stop_working() - - @protocol.handle(methods.trigger, env="tid", agent="id") - async def trigger_update(self, env: uuid.UUID, agent: str, incremental_deploy: bool) -> Apireturn: - """ - Trigger an update - """ - assert env == self.environment - assert agent == AGENT_SCHEDULER_ID - if incremental_deploy: - await self.scheduler.deploy() - else: - await self.scheduler.repair() - return 200 - - @protocol.handle(methods.release_version, env="tid", agent="id") - async def read_version(self, env: uuid.UUID, agent: str, _: bool) -> Apireturn: - """ - Send a notification to the scheduler that a new version has been released - """ - assert env == self.environment - assert agent == AGENT_SCHEDULER_ID - await self.scheduler.new_version() - return 200 - - @protocol.handle(methods.resource_event, env="tid", agent="id") - async def resource_event( - self, - env: uuid.UUID, - agent: str, - resource: ResourceVersionIdStr, - send_events: bool, - state: const.ResourceState, - change: const.Change, - changes: dict[ResourceVersionIdStr, dict[str, AttributeStateChange]], - ) -> Apireturn: - # Doesn't do anything - pass - - @protocol.handle(methods.do_dryrun, env="tid", dry_run_id="id") - async def run_dryrun(self, env: uuid.UUID, dry_run_id: uuid.UUID, agent: str, version: int) -> Apireturn: - """ - Run a dryrun of the given version - """ - assert env == self.environment - assert agent == AGENT_SCHEDULER_ID - LOGGER.info("Agent %s got a trigger to run dryrun %s for version %s in environment %s", agent, dry_run_id, version, env) - - await self.scheduler.dryrun(dry_run_id, version) - return 200 - - @protocol.handle(methods.get_parameter, env="tid") - async def get_facts(self, env: uuid.UUID, agent: str, resource: dict[str, Any]) -> Apireturn: - assert env == self.environment - assert agent == AGENT_SCHEDULER_ID - await self.scheduler.get_facts(resource) - return 200 - - @protocol.handle(methods.get_status) - async def get_status(self) -> Apireturn: - return 200, collect_report(self) - - def check_storage(self) -> dict[str, str]: - """ - Check if the server storage is configured and ready to use. - """ - - # FIXME: review on disk layout: https://github.com/inmanta/inmanta-core/issues/7590 - - state_dir = cfg.state_dir.get() - - if not os.path.exists(state_dir): - os.mkdir(state_dir) - - agent_state_dir = os.path.join(state_dir, "agent") - - if not os.path.exists(agent_state_dir): - os.mkdir(agent_state_dir) - - dir_map = {"agent": agent_state_dir} - - code_dir = os.path.join(agent_state_dir, "code") - dir_map["code"] = code_dir - if not os.path.exists(code_dir): - os.mkdir(code_dir) - - env_dir = os.path.join(agent_state_dir, "env") - dir_map["env"] = env_dir - if not os.path.exists(env_dir): - os.mkdir(env_dir) - - executor_dir = os.path.join(agent_state_dir, "executor") - dir_map["executor"] = executor_dir - if not os.path.exists(executor_dir): - os.mkdir(executor_dir) - - return dir_map diff --git a/src/inmanta/app.py b/src/inmanta/app.py index b91254741e..8b99a6bab5 100644 --- a/src/inmanta/app.py +++ b/src/inmanta/app.py @@ -142,31 +142,6 @@ def start_agent(options: argparse.Namespace) -> None: LOGGER.info("Agent Shutdown complete") -@command("scheduler", help_msg="Start the resource scheduler") -def start_scheduler(options: argparse.Namespace) -> None: - """ - Start the new agent with the Resource Scheduler - """ - from inmanta.agent import agent_new - - # The call to configure() should be done as soon as possible. - # If an AsyncHTTPClient is started before this call, the max_client - # will not be taken into account. - max_clients: Optional[int] = Config.get(section="agent_rest_transport", name="max_clients") - - if max_clients: - AsyncHTTPClient.configure(None, max_clients=max_clients) - - tracing.configure_logfire("agent_rs") - util.ensure_event_loop() - a = agent_new.Agent() - - setup_signal_handlers(a.stop) - IOLoop.current().add_callback(a.start) - IOLoop.current().start() - LOGGER.info("Agent with Resource scheduler Shutdown complete") - - class ExperimentalFeatureFlags: """ Class to expose feature flag configs as options in a uniform matter diff --git a/src/inmanta/const.py b/src/inmanta/const.py index 19b9bb574c..e9f2c34375 100644 --- a/src/inmanta/const.py +++ b/src/inmanta/const.py @@ -356,7 +356,3 @@ class NotificationSeverity(str, Enum): # It's used to determine whether a venv is only partially created (due to a server crash for example) or to determine when the # venv was last used. INMANTA_VENV_STATUS_FILENAME = ".inmanta_venv_status" - - -# ID to represent the new scheduler as an agent -AGENT_SCHEDULER_ID = "$__scheduler" diff --git a/src/inmanta/deploy/scheduler.py b/src/inmanta/deploy/scheduler.py index 8dfda42ab3..381ee03f3a 100644 --- a/src/inmanta/deploy/scheduler.py +++ b/src/inmanta/deploy/scheduler.py @@ -17,20 +17,13 @@ """ import asyncio -import logging -import uuid -from collections.abc import Set -from typing import Any, Mapping, Optional +from collections.abc import Mapping, Set +from typing import Optional -from inmanta import data, resources -from inmanta.data import Resource from inmanta.data.model import ResourceIdStr from inmanta.deploy import work from inmanta.deploy.state import ModelState, ResourceDetails, ResourceStatus -LOGGER = logging.getLogger(__name__) - - # FIXME[#8008] review code structure + functionality + add docstrings # FIXME[#8008] add import entry point test case @@ -43,10 +36,7 @@ class ResourceScheduler: The scheduler expects to be notified by the server whenever a new version is released. """ - def __init__(self, environment: uuid.UUID) -> None: - """ - :param environment: the environment we work for - """ + def __init__(self) -> None: self._state: ModelState = ModelState(version=0) self._work: work.ScheduledWork = work.ScheduledWork( requires=self._state.requires.requires_view(), @@ -62,12 +52,9 @@ def __init__(self, environment: uuid.UUID) -> None: self._scheduler_lock: asyncio.Lock = asyncio.Lock() # - lock to serialize scheduler state updates (i.e. process new version) self._update_lock: asyncio.Lock = asyncio.Lock() - self._environment = environment async def start(self) -> None: - await self.new_version() - - async def stop(self) -> None: + # FIXME[#8009]: read from DB instead pass async def deploy(self) -> None: @@ -83,71 +70,34 @@ async def repair(self) -> None: # FIXME[#8008]: implement repair pass - async def dryrun(self, dry_run_id: uuid.UUID, version: int) -> None: - # FIXME - pass - - async def get_facts(self, resource: dict[str, Any]) -> None: - # FIXME, also clean up typing of arguments - pass - - async def build_resource_mappings_from_db( - self, - ) -> tuple[Mapping[ResourceIdStr, ResourceDetails], Mapping[ResourceIdStr, Set[ResourceIdStr]]]: - """ - Build a view on current resources. Might be filtered for a specific environment, used when a new version is released - - :return: resource_mapping {id -> resource details} and require_mapping {id -> requires} - """ - resources_from_db: list[Resource] = await data.Resource.get_resources_in_latest_version(environment=self._environment) - - resource_mapping = { - resource.resource_id: ResourceDetails(attribute_hash=resource.attribute_hash, attributes=resource.attributes) - for resource in resources_from_db - } - require_mapping = { - resource.resource_id: { - resources.Id.parse_id(req).resource_str() for req in list(resource.attributes.get("requires", [])) - } - for resource in resources_from_db - } - return resource_mapping, require_mapping - + # FIXME[#8009]: design step 2: read new state from DB instead of accepting as parameter + # (method should be notification only, i.e. 0 parameters) async def new_version( self, + version: int, + resources: Mapping[ResourceIdStr, ResourceDetails], + requires: Mapping[ResourceIdStr, Set[ResourceIdStr]], ) -> None: - """ - Method that is used as a notification from the Server to retrieve the latest data concerning the release of the - latest version. This method will fetch the latest version and the different resources in their latest version. - It will then compute the work that needs to be done (resources to create / delete / update) to be up to date with - this new version. - """ - environment = await data.Environment.get_by_id(self._environment) - if environment is None: - raise ValueError(f"No environment found with this id: `{self._environment}`") - version = environment.last_version - resources_from_db, requires_from_db = await self.build_resource_mappings_from_db() - async with self._update_lock: # Inspect new state and mark resources as "update pending" where appropriate. Since this method is the only writer # for "update pending", and a stale read is acceptable, we can do this part before acquiring the exclusive scheduler # lock. - deleted_resources: Set[ResourceIdStr] = self._state.resources.keys() - resources_from_db.keys() + deleted_resources: Set[ResourceIdStr] = self._state.resources.keys() - resources.keys() for resource in deleted_resources: self._work.delete_resource(resource) new_desired_state: list[ResourceIdStr] = [] added_requires: dict[ResourceIdStr, Set[ResourceIdStr]] = {} dropped_requires: dict[ResourceIdStr, Set[ResourceIdStr]] = {} - for resource, details in resources_from_db.items(): + for resource, details in resources.items(): if ( resource not in self._state.resources or details.attribute_hash != self._state.resources[resource].attribute_hash ): self._state.update_pending.add(resource) new_desired_state.append(resource) + new_requires: Set[ResourceIdStr] = requires.get(resource, set()) old_requires: Set[ResourceIdStr] = self._state.requires.get(resource, set()) - new_requires: Set[ResourceIdStr] = requires_from_db.get(resource, set()) added: Set[ResourceIdStr] = new_requires - old_requires dropped: Set[ResourceIdStr] = old_requires - new_requires if added: @@ -168,9 +118,9 @@ async def new_version( async with self._scheduler_lock: self._state.version = version for resource in new_desired_state: - self._state.update_desired_state(resource, resources_from_db[resource]) + self._state.update_desired_state(resource, resources[resource]) for resource in added_requires.keys() | dropped_requires.keys(): - self._state.update_requires(resource, requires_from_db[resource]) + self._state.update_requires(resource, requires[resource]) # ensure deploy for ALL dirty resources, not just the new ones # FIXME[#8008]: this is copy-pasted, make into a method? dirty: Set[ResourceIdStr] = { diff --git a/src/inmanta/deploy/work.py b/src/inmanta/deploy/work.py index 7c03636806..b69321fa0b 100644 --- a/src/inmanta/deploy/work.py +++ b/src/inmanta/deploy/work.py @@ -184,7 +184,7 @@ def queue_put_nowait(self, prioritized_task: PrioritizedTask[Task]) -> None: self._tasks_by_resource[task.resource] = {} self._tasks_by_resource[task.resource][task] = item # FIXME[#8008]: parse agent - # self._agent_queues["TODO"].put_nowait(item) + self._agent_queues["TODO"].put_nowait(item) async def queue_get(self, agent: str) -> Task: """ diff --git a/src/inmanta/protocol/methods.py b/src/inmanta/protocol/methods.py index 0dac8f7170..d3659a79d7 100644 --- a/src/inmanta/protocol/methods.py +++ b/src/inmanta/protocol/methods.py @@ -1034,23 +1034,6 @@ def trigger(tid: uuid.UUID, id: str, incremental_deploy: bool): """ -@typedmethod( - path="/scheduler/", - operation="POST", - server_agent=True, - enforce_auth=False, - timeout=5, - arg_options=AGENT_ENV_OPTS, - client_types=[], -) -def trigger_read_version(tid: uuid.UUID) -> int: - """ - Notify the scheduler that a new version has been released - - :param tid: The environment this agent is defined in - """ - - # Methods to send event to the server diff --git a/src/inmanta/server/agentmanager.py b/src/inmanta/server/agentmanager.py index a110c54e85..13e4968cae 100644 --- a/src/inmanta/server/agentmanager.py +++ b/src/inmanta/server/agentmanager.py @@ -48,7 +48,6 @@ SLICE_AGENT_MANAGER, SLICE_AUTOSTARTED_AGENT_MANAGER, SLICE_DATABASE, - SLICE_ENVIRONMENT, SLICE_SERVER, SLICE_SESSION_MANAGER, SLICE_TRANSPORT, @@ -57,7 +56,6 @@ from inmanta.server import protocol from inmanta.server.protocol import ReturnClient, ServerSlice, SessionListener, SessionManager from inmanta.server.server import Server -from inmanta.server.services import environmentservice from inmanta.types import Apireturn, ArgumentTypes, ReturnTupple from ..data.dataview import AgentView @@ -873,11 +871,8 @@ async def request_parameter(self, env_id: uuid.UUID, resource_id: ResourceIdStr) resource_id not in self._fact_resource_block_set or (self._fact_resource_block_set[resource_id] + self._fact_resource_block) < now ): - if opt.server_use_resource_scheduler.get(): - await self._autostarted_agent_manager._ensure_scheduler(env) - else: - agents = await data.ConfigurationModel.get_agents(env.id, version) - await self._autostarted_agent_manager._ensure_agents(env, agents) + agents = await data.ConfigurationModel.get_agents(env.id, version) + await self._autostarted_agent_manager._ensure_agents(env, agents) client = self.get_agent_client(env_id, res.agent) if client is not None: @@ -937,14 +932,12 @@ async def get_agent_process_details(self, env: data.Environment, id: uuid.UUID, return dto -class AutostartedAgentManager(ServerSlice, environmentservice.EnvironmentListener): +class AutostartedAgentManager(ServerSlice): """ An instance of this class manages autostarted agent instance processes. It does not manage the logical agents as those are managed by `:py:class:AgentManager`. """ - environment_service: environmentservice.EnvironmentService - def __init__(self) -> None: super().__init__(SLICE_AUTOSTARTED_AGENT_MANAGER) self._agent_procs: dict[UUID, subprocess.Process] = {} # env uuid -> subprocess.Process @@ -964,9 +957,6 @@ async def prestart(self, server: protocol.Server) -> None: assert isinstance(agent_manager, AgentManager) self._agent_manager = agent_manager - self.environment_service = cast(environmentservice.EnvironmentService, server.get_slice(SLICE_ENVIRONMENT)) - self.environment_service.register_listener(self, environmentservice.EnvironmentAction.created) - async def start(self) -> None: await super().start() self.add_background_task(self._start_agents()) @@ -992,24 +982,16 @@ async def _start_agents(self) -> None: environments = await data.Environment.get_list() for env in environments: autostart = await env.get(data.AUTOSTART_ON_START) - if not autostart: - continue - - if opt.server_use_resource_scheduler.get(): - await self._ensure_scheduler(env) - else: + if autostart: agents = await data.Agent.get_list(environment=env.id) - agent_list = {a.name for a in agents} + agent_list = [a.name for a in agents] await self._ensure_agents(env, agent_list) async def restart_agents(self, env: data.Environment) -> None: LOGGER.debug("Restarting agents in environment %s", env.id) - if opt.server_use_resource_scheduler.get(): - await self._ensure_scheduler(env) - else: - agents = await data.Agent.get_list(environment=env.id) - agent_list = [a.name for a in agents] - await self._ensure_agents(env, agent_list, restart=True) + agents = await data.Agent.get_list(environment=env.id) + agent_list = [a.name for a in agents] + await self._ensure_agents(env, agent_list, restart=True) async def stop_agents( self, @@ -1176,78 +1158,6 @@ async def _ensure_agents( LOGGER.warning("Not all agent instances started successfully") return start_new_process - # Start/Restart scheduler - async def _ensure_scheduler( - self, - env: data.Environment, - *, - restart: bool = False, - connection: Optional[asyncpg.connection.Connection] = None, - ) -> bool: - """ - Ensure that all agents defined in the current environment (model) and that should be autostarted, are started. - - :param env: The environment to start the agents for - :param restart: Restart all agents even if the list of agents is up to date. - :param connection: The database connection to use. Must not be in a transaction context. - - :return: True iff a new agent process was started. - """ - if self._stopping: - raise ShutdownInProgress() - - if connection is not None and connection.is_in_transaction(): - # Should not be called in a transaction context because it has (immediate) side effects outside of the database - # that are tied to the database state. Several inconsistency issues could occur if this runs in a transaction - # context: - # - side effects based on oncommitted reads (may even need to be rolled back) - # - race condition with similar side effect flows due to stale reads (e.g. other flow pauses agent and kills - # process, this one brings it back because it reads the agent as unpaused) - raise Exception("_ensure_scheduler should not be called in a transaction context") - - autostart_scheduler = {const.AGENT_SCHEDULER_ID} - async with data.Agent.get_connection(connection) as connection: - async with self.agent_lock: - # silently ignore requests if this environment is halted - refreshed_env: Optional[data.Environment] = await data.Environment.get_by_id(env.id, connection=connection) - if refreshed_env is None: - raise Exception("Can't ensure agent: environment %s does not exist" % env.id) - env = refreshed_env - if env.halted: - return False - - if not restart and await self._agent_manager.are_agents_active(env.id, autostart_scheduler): - # do not start a new agent process if the agents are already active, regardless of whether their session - # is with an autostarted process or not. - return False - - start_new_process: bool - if env.id not in self._agent_procs or self._agent_procs[env.id].returncode is not None: - # Start new process if none is currently running for this environment. - # Otherwise trust that it tracks any changes to the agent map. - LOGGER.info("%s matches agents managed by server, ensuring they are started.", autostart_scheduler) - start_new_process = True - elif restart: - LOGGER.info( - "%s matches agents managed by server, forcing restart: stopping process with PID %s.", - autostart_scheduler, - self._agent_procs[env.id], - ) - await self._stop_autostarted_agents(env, connection=connection) - start_new_process = True - else: - start_new_process = False - - if start_new_process: - self._agent_procs[env.id] = await self.__do_start_agent(env, connection=connection) - - # Wait for all agents to start - try: - await self._wait_for_agents(env, autostart_scheduler, connection=connection) - except asyncio.TimeoutError: - LOGGER.warning("Not all agent instances started successfully") - return start_new_process - async def __do_start_agent( self, env: data.Environment, *, connection: Optional[asyncpg.connection.Connection] = None ) -> subprocess.Process: @@ -1269,8 +1179,6 @@ async def __do_start_agent( agent_log = os.path.join(self._server_storage["logs"], "agent-%s.log" % env.id) - use_resource_scheduler: bool = opt.server_use_resource_scheduler.get() - proc: subprocess.Process = await self._fork_inmanta( [ "--log-file-level", @@ -1282,7 +1190,7 @@ async def __do_start_agent( Config._config_dir if Config._config_dir is not None else "", "--log-file", agent_log, - "scheduler" if use_resource_scheduler else "agent", + "agent", ], out, err, @@ -1488,17 +1396,3 @@ async def _wait_for_proc_bounded( await asyncio.wait_for(asyncio.gather(*[asyncio.shield(proc.wait()) for proc in unfinished_processes]), timeout) except asyncio.TimeoutError: LOGGER.warning("Agent processes did not close in time (%s)", procs) - - async def environment_action_created(self, env: model.Environment) -> None: - """ - Will be called when a new environment is created to create a scheduler agent - - :param env: The new environment - """ - if not opt.server_use_resource_scheduler.get(): - return - - env_db = await data.Environment.get_by_id(env.id) - await self._ensure_scheduler(env_db) - # We need to make sure that the AGENT_SCHEDULER is registered to be up and running - await self._agent_manager.ensure_agent_registered(env_db, const.AGENT_SCHEDULER_ID) diff --git a/src/inmanta/server/config.py b/src/inmanta/server/config.py index 43ba94b721..4de8955346 100644 --- a/src/inmanta/server/config.py +++ b/src/inmanta/server/config.py @@ -273,10 +273,6 @@ def validate_fact_renew(value: object) -> int: is_str_opt, ) -server_use_resource_scheduler = Option( - "server", "new-resource-scheduler", False, "Enable the new Resource Scheduler component", is_bool -) - def default_hangtime() -> int: """:inmanta.config:option:`server.agent-timeout` *3/4""" diff --git a/src/inmanta/server/services/compilerservice.py b/src/inmanta/server/services/compilerservice.py index 620e8df1e8..3eda91596d 100644 --- a/src/inmanta/server/services/compilerservice.py +++ b/src/inmanta/server/services/compilerservice.py @@ -518,7 +518,6 @@ class CompilerService(ServerSlice, environmentservice.EnvironmentListener): """ _env_folder: str - environment_service: environmentservice.EnvironmentService def __init__(self) -> None: super().__init__(SLICE_COMPILER) diff --git a/src/inmanta/server/services/dryrunservice.py b/src/inmanta/server/services/dryrunservice.py index 1e033f3b70..b8aa7e9308 100644 --- a/src/inmanta/server/services/dryrunservice.py +++ b/src/inmanta/server/services/dryrunservice.py @@ -21,14 +21,20 @@ import uuid from typing import Optional, cast -from inmanta import const, data +from inmanta import data from inmanta.data.model import DryRun, DryRunReport, ResourceDiff, ResourceDiffStatus, ResourceVersionIdStr from inmanta.protocol import handle, methods, methods_v2 from inmanta.protocol.exceptions import NotFound from inmanta.resources import Id -from inmanta.server import SLICE_AGENT_MANAGER, SLICE_AUTOSTARTED_AGENT_MANAGER, SLICE_DATABASE, SLICE_DRYRUN, SLICE_TRANSPORT -from inmanta.server import config as opt -from inmanta.server import diff, protocol +from inmanta.server import ( + SLICE_AGENT_MANAGER, + SLICE_AUTOSTARTED_AGENT_MANAGER, + SLICE_DATABASE, + SLICE_DRYRUN, + SLICE_TRANSPORT, + diff, + protocol, +) from inmanta.server.agentmanager import AgentManager, AutostartedAgentManager from inmanta.types import Apireturn, JsonType @@ -73,12 +79,8 @@ async def create_dryrun(self, env: data.Environment, version_id: int, model: dat # Create a dryrun document dryrun = await data.DryRun.create(environment=env.id, model=version_id, todo=len(rvs), total=len(rvs)) - if opt.server_use_resource_scheduler.get(): - agents = [const.AGENT_SCHEDULER_ID] - await self.autostarted_agent_manager._ensure_scheduler(env) - else: - agents = await data.ConfigurationModel.get_agents(env.id, version_id) - await self.autostarted_agent_manager._ensure_agents(env, agents) + agents = await data.ConfigurationModel.get_agents(env.id, version_id) + await self.autostarted_agent_manager._ensure_agents(env, agents) agents_down = [] for agent in agents: diff --git a/src/inmanta/server/services/environmentservice.py b/src/inmanta/server/services/environmentservice.py index f4c08a14f2..0b4bd93658 100644 --- a/src/inmanta/server/services/environmentservice.py +++ b/src/inmanta/server/services/environmentservice.py @@ -50,11 +50,13 @@ SLICE_RESOURCE, SLICE_SERVER, SLICE_TRANSPORT, - agentmanager, protocol, ) +from inmanta.server.agentmanager import AgentManager, AutostartedAgentManager from inmanta.server.server import Server -from inmanta.server.services import orchestrationservice, resourceservice +from inmanta.server.services import compilerservice +from inmanta.server.services.orchestrationservice import OrchestrationService +from inmanta.server.services.resourceservice import ResourceService from inmanta.types import Apireturn, JsonType, Warnings LOGGER = logging.getLogger(__name__) @@ -83,18 +85,21 @@ class EnvironmentListener: async def environment_action_created(self, env: model.Environment) -> None: """ Will be called when a new environment is created + :param env: The new environment """ async def environment_action_cleared(self, env: model.Environment) -> None: """ Will be called when the environment is cleared + :param env: The environment that is cleared """ async def environment_action_deleted(self, env: model.Environment) -> None: """ Will be called when the environment is deleted + :param env: The environment that is deleted """ @@ -110,10 +115,10 @@ class EnvironmentService(protocol.ServerSlice): """Slice with project and environment management""" server_slice: Server - agent_manager: "agentmanager.AgentManager" - autostarted_agent_manager: "agentmanager.AutostartedAgentManager" - orchestration_service: "orchestrationservice.OrchestrationService" - resource_service: "resourceservice.ResourceService" + agent_manager: AgentManager + autostarted_agent_manager: AutostartedAgentManager + orchestration_service: OrchestrationService + resource_service: ResourceService listeners: dict[EnvironmentAction, list[EnvironmentListener]] # environment_state_operation_lock is to prevent concurrent execution of # operations that modify the state of an environment, such as halting, resuming, or deleting. @@ -146,16 +151,15 @@ def get_depended_by(self) -> list[str]: async def prestart(self, server: protocol.Server) -> None: await super().prestart(server) self.server_slice = cast(Server, server.get_slice(SLICE_SERVER)) - self.agent_manager = cast(agentmanager.AgentManager, server.get_slice(SLICE_AGENT_MANAGER)) - self.autostarted_agent_manager = cast( - agentmanager.AutostartedAgentManager, server.get_slice(SLICE_AUTOSTARTED_AGENT_MANAGER) - ) - self.orchestration_service = cast(orchestrationservice.OrchestrationService, server.get_slice(SLICE_ORCHESTRATION)) - self.resource_service = cast(resourceservice.ResourceService, server.get_slice(SLICE_RESOURCE)) + self.agent_manager = cast(AgentManager, server.get_slice(SLICE_AGENT_MANAGER)) + self.autostarted_agent_manager = cast(AutostartedAgentManager, server.get_slice(SLICE_AUTOSTARTED_AGENT_MANAGER)) + self.compiler_service = cast(compilerservice.CompilerService, server.get_slice(SLICE_COMPILER)) + self.orchestration_service = cast(OrchestrationService, server.get_slice(SLICE_ORCHESTRATION)) + self.resource_service = cast(ResourceService, server.get_slice(SLICE_RESOURCE)) # Register the compiler service here to the environment service listener. Registering it within the compiler service # would result in a circular dependency between the environment slice and the compiler service slice. self.register_listener_for_multiple_actions( - self.server_slice.compiler, {EnvironmentAction.cleared, EnvironmentAction.deleted} + self.compiler_service, {EnvironmentAction.cleared, EnvironmentAction.deleted} ) async def start(self) -> None: @@ -179,13 +183,13 @@ async def _enable_schedules(self, env: data.Environment, setting: Optional[data. if setting is None or setting.name == data.AUTO_FULL_COMPILE: if setting is not None: LOGGER.info("Environment setting %s changed. Rescheduling full compiles.", setting.name) - self.server_slice.compiler.schedule_full_compile(env, str(await env.get(data.AUTO_FULL_COMPILE))) + self.compiler_service.schedule_full_compile(env, str(await env.get(data.AUTO_FULL_COMPILE))) def _disable_schedules(self, env: data.Environment) -> None: """ Removes scheduling of all appropriate actions for a single environment. """ - self.server_slice.compiler.schedule_full_compile(env, schedule_cron="") + self.compiler_service.schedule_full_compile(env, schedule_cron="") async def _setting_change(self, env: data.Environment, key: str) -> Warnings: setting = env._settings[key] @@ -519,7 +523,7 @@ async def environment_delete(self, environment_id: uuid.UUID) -> None: await self._halt(env, connection=connection, delete_agent_venv=True) self._disable_schedules(env) - await self.server_slice.compiler.cancel_compile(env.id) + await self.compiler_service.cancel_compile(env.id) # Delete the environment directory before deleting the database records. This ensures that # this operation can be retried if the deletion of the environment directory fails. Otherwise, # the environment directory would be left in an inconsistent state. This can cause problems if @@ -549,7 +553,7 @@ async def environment_clear(self, env: data.Environment) -> None: # Keep this method call under the self.environment_state_operation_lock lock, because cancel_compile() # must be called on halted environments only. - await self.server_slice.compiler.cancel_compile(env.id) + await self.compiler_service.cancel_compile(env.id) # Delete the environment directory before deleting the database records. This ensures that # this operation can be retried if the deletion of the environment directory fails. Otherwise, # the environment directory would be left in an inconsistent state. This can cause problems if @@ -620,27 +624,25 @@ async def environment_setting_delete(self, env: data.Environment, key: str) -> R except KeyError: raise NotFound() - def register_listener_for_multiple_actions( - self, current_listener: EnvironmentListener, actions: Set[EnvironmentAction] - ) -> None: + def register_listener_for_multiple_actions(self, listener: EnvironmentListener, actions: Set[EnvironmentAction]) -> None: """ Should only be called during pre-start - :param current_listener: The listener to register + :param listener: The listener to register :param actions: type of actions the listener is interested in """ for action in actions: - self.register_listener(current_listener, action) + self.register_listener(listener, action) - def register_listener(self, current_listener: EnvironmentListener, action: EnvironmentAction) -> None: + def register_listener(self, listener: EnvironmentListener, action: EnvironmentAction) -> None: """ Should only be called during pre-start - :param current_listener: The listener to register + :param listener: The listener to register :param action: type of action the listener is interested in """ - self.listeners[action].append(current_listener) + self.listeners[action].append(listener) - def remove_listener(self, action: EnvironmentAction, current_listener: EnvironmentListener) -> None: - self.listeners[action].remove(current_listener) + def remove_listener(self, action: EnvironmentAction, listener: EnvironmentListener) -> None: + self.listeners[action].remove(listener) async def _delete_environment_dir(self, environment_id: uuid.UUID) -> None: """ @@ -672,21 +674,18 @@ async def _delete_environment_dir(self, environment_id: uuid.UUID) -> None: ) async def notify_listeners( - self, - action: EnvironmentAction, - updated_env: model.Environment, - original_env: Optional[model.Environment] = None, + self, action: EnvironmentAction, updated_env: model.Environment, original_env: Optional[model.Environment] = None ) -> None: - for current_listener in self.listeners[action]: + for listener in self.listeners[action]: try: if action == EnvironmentAction.created: - await current_listener.environment_action_created(updated_env) + await listener.environment_action_created(updated_env) if action == EnvironmentAction.deleted: - await current_listener.environment_action_deleted(updated_env) + await listener.environment_action_deleted(updated_env) if action == EnvironmentAction.cleared: - await current_listener.environment_action_cleared(updated_env) + await listener.environment_action_cleared(updated_env) if action == EnvironmentAction.updated and original_env: - await current_listener.environment_action_updated(updated_env, original_env) + await listener.environment_action_updated(updated_env, original_env) except Exception: LOGGER.warning("Notifying listener of %s failed with the following exception", action.value, exc_info=True) diff --git a/src/inmanta/server/services/orchestrationservice.py b/src/inmanta/server/services/orchestrationservice.py index 48acbcd2d3..f8aeb2f524 100644 --- a/src/inmanta/server/services/orchestrationservice.py +++ b/src/inmanta/server/services/orchestrationservice.py @@ -61,11 +61,11 @@ SLICE_ORCHESTRATION, SLICE_RESOURCE, SLICE_TRANSPORT, - agentmanager, ) from inmanta.server import config as opt from inmanta.server import diff, protocol -from inmanta.server.services import resourceservice +from inmanta.server.agentmanager import AgentManager, AutostartedAgentManager +from inmanta.server.services.resourceservice import ResourceService from inmanta.server.validate_filter import InvalidFilter from inmanta.types import Apireturn, JsonType, PrimitiveTypes, ReturnTupple @@ -378,9 +378,9 @@ async def merge_unknowns( class OrchestrationService(protocol.ServerSlice): """Resource Manager service""" - agentmanager_service: "agentmanager.AgentManager" - autostarted_agent_manager: "agentmanager.AutostartedAgentManager" - resource_service: "resourceservice.ResourceService" + agentmanager_service: "AgentManager" + autostarted_agent_manager: AutostartedAgentManager + resource_service: ResourceService def __init__(self) -> None: super().__init__(SLICE_ORCHESTRATION) @@ -393,11 +393,9 @@ def get_depended_by(self) -> list[str]: async def prestart(self, server: protocol.Server) -> None: await super().prestart(server) - self.agentmanager_service = cast("agentmanager.AgentManager", server.get_slice(SLICE_AGENT_MANAGER)) - self.autostarted_agent_manager = cast( - agentmanager.AutostartedAgentManager, server.get_slice(SLICE_AUTOSTARTED_AGENT_MANAGER) - ) - self.resource_service = cast("resourceservice.ResourceService", server.get_slice(SLICE_RESOURCE)) + self.agentmanager_service = cast("AgentManager", server.get_slice(SLICE_AGENT_MANAGER)) + self.autostarted_agent_manager = cast(AutostartedAgentManager, server.get_slice(SLICE_AUTOSTARTED_AGENT_MANAGER)) + self.resource_service = cast(ResourceService, server.get_slice(SLICE_RESOURCE)) async def start(self) -> None: if PERFORM_CLEANUP: @@ -819,12 +817,7 @@ async def _put_version( await data.UnknownParameter.insert_many(unknowns, connection=connection) - all_agents: abc.Set[str] - if opt.server_use_resource_scheduler.get(): - all_agents = {const.AGENT_SCHEDULER_ID} - else: - all_agents = {res.agent for res in rid_to_resource.values()} - + all_agents: abc.Set[str] = {res.agent for res in rid_to_resource.values()} for agent in all_agents: await self.agentmanager_service.ensure_agent_registered(env, agent, connection=connection) @@ -1198,24 +1191,7 @@ async def release_version( await model.mark_done(connection=connection) return 200, {"model": model} - is_using_new_scheduler = opt.server_use_resource_scheduler.get() - # New code relying on the ResourceScheduler - if is_using_new_scheduler: - if connection.is_in_transaction(): - raise RuntimeError( - "The release of a new version cannot be in a transaction! " - "The agent would not see the data that as committed" - ) - await self.autostarted_agent_manager._ensure_scheduler(env) - agent = const.AGENT_SCHEDULER_ID - - client = self.agentmanager_service.get_agent_client(env.id, const.AGENT_SCHEDULER_ID) - if client is not None: - self.add_background_task(client.trigger_release_version(env.id)) - else: - LOGGER.warning("Agent %s from model %s in env %s is not available for a deploy", agent, version_id, env.id) - # Old code - elif push: + if push: # We can't be in a transaction here, or the agent will not see the data that as committed # This assert prevents anyone from wrapping this method in a transaction by accident assert not connection.is_in_transaction() @@ -1225,7 +1201,6 @@ async def release_version( agents = await data.ConfigurationModel.get_agents(env.id, version_id, connection=connection) await self.autostarted_agent_manager._ensure_agents(env, agents, connection=connection) - assert agents is not None for agent in agents: client = self.agentmanager_service.get_agent_client(env.id, agent) if client is not None: @@ -1271,14 +1246,11 @@ async def deploy( if not allagents: return attach_warnings(404, {"message": "No agent could be reached"}, warnings) - is_using_new_scheduler = opt.server_use_resource_scheduler.get() - if is_using_new_scheduler: - await self.autostarted_agent_manager._ensure_scheduler(env) - else: - await self.autostarted_agent_manager._ensure_agents(env, allagents) - present = set() absent = set() + + await self.autostarted_agent_manager._ensure_agents(env, allagents) + for agent in allagents: client = self.agentmanager_service.get_agent_client(env.id, agent) if client is not None: diff --git a/src/inmanta/server/services/resourceservice.py b/src/inmanta/server/services/resourceservice.py index 7f60c5b258..29efce508e 100644 --- a/src/inmanta/server/services/resourceservice.py +++ b/src/inmanta/server/services/resourceservice.py @@ -63,9 +63,10 @@ from inmanta.protocol.exceptions import BadRequest, Conflict, Forbidden, NotFound, ServerError from inmanta.protocol.return_value_meta import ReturnValueWithMeta from inmanta.resources import Id -from inmanta.server import SLICE_AGENT_MANAGER, SLICE_DATABASE, SLICE_RESOURCE, SLICE_TRANSPORT, agentmanager +from inmanta.server import SLICE_AGENT_MANAGER, SLICE_DATABASE, SLICE_RESOURCE, SLICE_TRANSPORT from inmanta.server import config as opt from inmanta.server import extensions, protocol +from inmanta.server.agentmanager import AgentManager from inmanta.server.validate_filter import InvalidFilter from inmanta.types import Apireturn, JsonType, PrimitiveTypes from inmanta.util import parse_timestamp @@ -114,7 +115,7 @@ def __init__(self, logger_name: str, level: int, msg: str, created: datetime.dat class ResourceService(protocol.ServerSlice): """Resource Manager service""" - agentmanager_service: "agentmanager.AgentManager" + agentmanager_service: "AgentManager" def __init__(self) -> None: super().__init__(SLICE_RESOURCE) @@ -147,7 +148,7 @@ def define_features(self) -> list[extensions.Feature]: async def prestart(self, server: protocol.Server) -> None: await super().prestart(server) - self.agentmanager_service = cast("agentmanager.AgentManager", server.get_slice(SLICE_AGENT_MANAGER)) + self.agentmanager_service = cast("AgentManager", server.get_slice(SLICE_AGENT_MANAGER)) async def start(self) -> None: self.schedule( diff --git a/tests/deploy/test_scheduler.py b/tests/deploy/test_scheduler.py deleted file mode 100644 index 2bf69f8fdd..0000000000 --- a/tests/deploy/test_scheduler.py +++ /dev/null @@ -1,328 +0,0 @@ -""" - Copyright 2024 Inmanta - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - - Contact: code@inmanta.com -""" - -import copy -import logging -import typing -import uuid - -import inmanta.execute.util -from inmanta import const, data -from inmanta.config import Config -from inmanta.deploy.scheduler import ResourceScheduler -from inmanta.util import get_compiler_version, retry_limited -from utils import UNKWN, ClientHelper, assert_equal_ish - -logger = logging.getLogger("inmanta.test.server_agent") - - -async def test_deploy_new_scheduler(server, client, async_finalizer, no_agent_backoff): - """ - This tests make sure the resource scheduler is working as expected for these parts: - - Construction of initial model state - - Retrieval of data when a new version is released - """ - # First part - test the ResourceScheduler (retrieval of data from DB) - Config.set("config", "agent-deploy-interval", "100") - Config.set("server", "new-resource-scheduler", "True") - - result = await client.create_project("env-test") - project_id = result.result["project"]["id"] - - result = await client.create_environment(project_id=project_id, name="dev") - env_id = result.result["environment"]["id"] - env = await data.Environment.get_by_id(uuid.UUID(env_id)) - await env.set(data.AUTO_DEPLOY, False) - await env.set(data.PUSH_ON_AUTO_DEPLOY, False) - await env.set(data.AGENT_TRIGGER_METHOD_ON_AUTO_DEPLOY, const.AgentTriggerMethod.push_full_deploy) - - clienthelper = ClientHelper(client, env_id) - - version = await clienthelper.get_version() - - resources = [ - { - "key": "key1", - "value": "value1", - "id": "test::Resource[agent2,key=key1],v=%d" % version, - "send_event": False, - "purged": False, - "requires": [], - }, - { - "key": "key2", - "value": inmanta.execute.util.Unknown(source=None), - "id": "test::Resource[agent2,key=key2],v=%d" % version, - "send_event": False, - "purged": False, - "requires": [], - }, - { - "key": "key4", - "value": inmanta.execute.util.Unknown(source=None), - "id": "test::Resource[agent2,key=key4],v=%d" % version, - "send_event": False, - "requires": ["test::Resource[agent2,key=key1],v=%d" % version, "test::Resource[agent2,key=key2],v=%d" % version], - "purged": False, - }, - { - "key": "key5", - "value": "val", - "id": "test::Resource[agent2,key=key5],v=%d" % version, - "send_event": False, - "requires": ["test::Resource[agent2,key=key4],v=%d" % version], - "purged": False, - }, - ] - - status = { - "test::Resource[agent2,key=key4]": const.ResourceState.undefined, - "test::Resource[agent2,key=key2]": const.ResourceState.undefined, - } - result = await client.put_version( - tid=env_id, - version=version, - resources=resources, - resource_state=status, - unknowns=[], - version_info={}, - compiler_version=get_compiler_version(), - ) - assert result.code == 200 - - scheduler = ResourceScheduler(env_id) - await scheduler.start() - - for resource in resources: - id_without_version, _, _ = resource["id"].partition(",v=") - assert id_without_version in scheduler._state.resources - expected_resource_attributes = copy.deepcopy(resource) - expected_resource_attributes.pop("id") - current_attributes = scheduler._state.resources[id_without_version].attributes - - if current_attributes["value"] == "<>" and isinstance( - expected_resource_attributes["value"], inmanta.execute.util.Unknown - ): - expected_resource_attributes["value"] = "<>" - new_requires = [] - for require in expected_resource_attributes["requires"]: - require_without_version, _, _ = require.partition(",v=") - new_requires.append(require_without_version) - expected_resource_attributes["requires"] = new_requires - assert current_attributes == expected_resource_attributes - # This resource has no requirements - if id_without_version not in scheduler._state.requires._primary: - assert expected_resource_attributes["requires"] == [] - else: - assert scheduler._state.requires._primary[id_without_version] == set(expected_resource_attributes["requires"]) - - version = await ClientHelper(client, env_id).get_version() - - # The purged status has been changed for: - `test::Resource[agent2,key=key2]` and `test::Resource[agent2,key=key4]` - updated_resources = [ - { - "key": "key1", - "value": "value1", - "id": "test::Resource[agent2,key=key1],v=%d" % version, - "send_event": False, - "purged": False, - "requires": [], - }, - { - "key": "key2", - "value": inmanta.execute.util.Unknown(source=None), - "id": "test::Resource[agent2,key=key2],v=%d" % version, - "send_event": False, - "purged": True, - "requires": [], - }, - { - "key": "key4", - "value": inmanta.execute.util.Unknown(source=None), - "id": "test::Resource[agent2,key=key4],v=%d" % version, - "send_event": False, - "requires": ["test::Resource[agent2,key=key1],v=%d" % version, "test::Resource[agent2,key=key2],v=%d" % version], - "purged": True, - }, - { - "key": "key5", - "value": "val", - "id": "test::Resource[agent2,key=key5],v=%d" % version, - "send_event": False, - "requires": ["test::Resource[agent2,key=key4],v=%d" % version], - "purged": False, - }, - ] - result = await client.put_version( - tid=env_id, - version=version, - resources=updated_resources, - resource_state=status, - unknowns=[], - version_info={}, - compiler_version=get_compiler_version(), - ) - assert result.code == 200 - - # We test the new_version method the ResourceScheduler - await scheduler.new_version() - - for resource in updated_resources: - id_without_version, _, _ = resource["id"].partition(",v=") - assert id_without_version in scheduler._state.resources - expected_resource_attributes = copy.deepcopy(resource) - expected_resource_attributes.pop("id") - current_attributes = scheduler._state.resources[id_without_version].attributes - - if current_attributes["value"] == "<>" and isinstance( - expected_resource_attributes["value"], inmanta.execute.util.Unknown - ): - expected_resource_attributes["value"] = "<>" - new_requires = [] - for require in expected_resource_attributes["requires"]: - require_without_version, _, _ = require.partition(",v=") - new_requires.append(require_without_version) - expected_resource_attributes["requires"] = new_requires - assert current_attributes == expected_resource_attributes - if id_without_version not in scheduler._state.requires._primary: - assert expected_resource_attributes["requires"] == [] - else: - assert scheduler._state.requires._primary[id_without_version] == set(expected_resource_attributes["requires"]) - - # Now we make sure that the agent is up and running for this environment - result = await client.list_agent_processes(env_id) - assert result.code == 200 - - async def done() -> bool: - result = await client.list_agent_processes(env_id) - assert result.code == 200 - return len(result.result["processes"]) == 1 - - await retry_limited(done, 5) - - result = await client.list_agent_processes(env_id) - assert len(result.result["processes"]) == 1 - - endpoint_id: typing.Optional[uuid.UUID] = None - for proc in result.result["processes"]: - assert proc["environment"] == env_id - assert len(proc["endpoints"]) == 1 - assert proc["endpoints"][0]["name"] == const.AGENT_SCHEDULER_ID - endpoint_id = proc["endpoints"][0]["id"] - assert endpoint_id is not None - - assert_equal_ish( - { - "processes": [ - { - "expired": None, - "environment": env_id, - "endpoints": [{"name": UNKWN, "process": UNKWN, "id": UNKWN}], - "hostname": UNKWN, - "first_seen": UNKWN, - "last_seen": UNKWN, - }, - ] - }, - result.result, - ["name", "first_seen"], - ) - - # The agent was there because `ensure_agent_registered` was called each time we release a new version or call - # `put_version` + `AUTO_DEPLOY` set to `True` - # But this will be tested later in the test to make sure that when we create a new environment, everything is started and - # registered correctly - result = await client.list_agents(tid=env_id) - assert result.code == 200 - - expected_agent = { - "agents": [ - { - "last_failover": UNKWN, - "environment": env_id, - "paused": False, - "primary": endpoint_id, - "name": const.AGENT_SCHEDULER_ID, - "state": "up", - } - ] - } - - assert_equal_ish(expected_agent, result.result) - - # We test the creation of this new agent on another environment - result = await client.create_environment(project_id=project_id, name="dev2") - new_env_id = result.result["environment"]["id"] - - result = await client.list_agent_processes(new_env_id) - assert result.code == 200 - - async def done() -> bool: - result = await client.list_agent_processes(new_env_id) - assert result.code == 200 - return len(result.result["processes"]) == 1 - - await retry_limited(done, 5) - - result = await client.list_agent_processes(new_env_id) - assert len(result.result["processes"]) == 1 - - new_endpoint_id: typing.Optional[uuid.UUID] = None - for proc in result.result["processes"]: - assert proc["environment"] == new_env_id - assert len(proc["endpoints"]) == 1 - assert proc["endpoints"][0]["name"] == const.AGENT_SCHEDULER_ID - new_endpoint_id = proc["endpoints"][0]["id"] - assert new_endpoint_id is not None - - assert_equal_ish( - { - "processes": [ - { - "expired": None, - "environment": new_env_id, - "endpoints": [{"name": UNKWN, "process": UNKWN, "id": UNKWN}], - "hostname": UNKWN, - "first_seen": UNKWN, - "last_seen": UNKWN, - }, - ] - }, - result.result, - ["name", "first_seen"], - ) - - result = await client.list_agents(tid=new_env_id) - assert result.code == 200 - - expected_agent = { - "agents": [ - { - "last_failover": UNKWN, - "environment": new_env_id, - "paused": False, - "primary": new_endpoint_id, - "name": const.AGENT_SCHEDULER_ID, - "state": "up", - } - ] - } - - assert_equal_ish(expected_agent, result.result) - - Config.set("server", "new-resource-scheduler", "False") diff --git a/tests/test_project.py b/tests/test_project.py index 137102746b..89bc0786c4 100644 --- a/tests/test_project.py +++ b/tests/test_project.py @@ -29,7 +29,7 @@ from inmanta.data import model from inmanta.module import ModuleLoadingException, Project from inmanta.server import SLICE_ENVIRONMENT -from inmanta.server.services import environmentservice +from inmanta.server.services.environmentservice import EnvironmentAction, EnvironmentListener, EnvironmentService from utils import log_contains @@ -362,7 +362,7 @@ async def test_create_with_id(client): async def test_environment_listener(server, client_v2, caplog): - class EnvironmentListenerCounter(environmentservice.EnvironmentListener): + class EnvironmentListenerCounter(EnvironmentListener): def __init__(self): self.created_counter = 0 self.updated_counter = 0 @@ -385,15 +385,10 @@ async def environment_action_updated(self, updated_env: model.Environment, origi environment_listener = EnvironmentListenerCounter() - environment_service = cast(environmentservice.EnvironmentService, server.get_slice(SLICE_ENVIRONMENT)) + environment_service = cast(EnvironmentService, server.get_slice(SLICE_ENVIRONMENT)) environment_service.register_listener_for_multiple_actions( environment_listener, - { - environmentservice.EnvironmentAction.created, - environmentservice.EnvironmentAction.updated, - environmentservice.EnvironmentAction.deleted, - environmentservice.EnvironmentAction.cleared, - }, + {EnvironmentAction.created, EnvironmentAction.updated, EnvironmentAction.deleted, EnvironmentAction.cleared}, ) result = await client_v2.project_create("project-test") assert result.code == 200