diff --git a/changelogs/unreleased/8253-read-resource-state-from-scheduler.yml b/changelogs/unreleased/8253-read-resource-state-from-scheduler.yml new file mode 100644 index 0000000000..fb24dce2a4 --- /dev/null +++ b/changelogs/unreleased/8253-read-resource-state-from-scheduler.yml @@ -0,0 +1,5 @@ +description: Read resource state from scheduler instead of reading from DB +issue-nr: 8253 +change-type: minor +destination-branches: [master] + diff --git a/src/inmanta/agent/executor.py b/src/inmanta/agent/executor.py index c522341c12..b1691aad0b 100644 --- a/src/inmanta/agent/executor.py +++ b/src/inmanta/agent/executor.py @@ -537,7 +537,7 @@ async def execute( gid: uuid.UUID, resource_details: ResourceDetails, reason: str, - requires: dict[ResourceIdStr, const.ResourceState], + requires: Mapping[ResourceIdStr, const.ResourceState], ) -> DeployResult: """ Perform the actual deployment of the resource by calling the loaded handler code diff --git a/src/inmanta/agent/forking_executor.py b/src/inmanta/agent/forking_executor.py index 3183885ec5..fdd3aef8d4 100644 --- a/src/inmanta/agent/forking_executor.py +++ b/src/inmanta/agent/forking_executor.py @@ -75,6 +75,7 @@ import typing import uuid from asyncio import Future, transports +from collections.abc import Mapping from concurrent.futures import ThreadPoolExecutor from typing import Awaitable @@ -491,7 +492,7 @@ def __init__( gid: uuid.UUID, resource_details: "inmanta.agent.executor.ResourceDetails", reason: str, - requires: dict[ResourceIdStr, const.ResourceState], + requires: Mapping[ResourceIdStr, const.ResourceState], ) -> None: self.agent_name = agent_name self.gid = gid @@ -811,7 +812,7 @@ async def execute( gid: uuid.UUID, resource_details: "inmanta.agent.executor.ResourceDetails", reason: str, - requires: dict[ResourceIdStr, const.ResourceState], + requires: Mapping[ResourceIdStr, const.ResourceState], ) -> DeployResult: return await self.call(ExecuteCommand(self.id.agent_name, action_id, gid, resource_details, reason, requires)) diff --git a/src/inmanta/agent/in_process_executor.py b/src/inmanta/agent/in_process_executor.py index f1f8dc922a..8ead2df90e 100644 --- a/src/inmanta/agent/in_process_executor.py +++ b/src/inmanta/agent/in_process_executor.py @@ -19,6 +19,7 @@ import uuid from asyncio import InvalidStateError, Lock from collections import defaultdict +from collections.abc import Mapping from concurrent.futures.thread import ThreadPoolExecutor from typing import Any, Optional @@ -141,7 +142,7 @@ async def _execute( resource: Resource, gid: uuid.UUID, ctx: handler.HandlerContext, - requires: dict[ResourceIdStr, const.ResourceState], + requires: Mapping[ResourceIdStr, const.ResourceState], ) -> None: """ Get the handler for a given resource and run its ``deploy`` method. @@ -220,7 +221,7 @@ async def execute( gid: uuid.UUID, resource_details: ResourceDetails, reason: str, - requires: dict[ResourceIdStr, const.ResourceState], + requires: Mapping[ResourceIdStr, const.ResourceState], ) -> DeployResult: try: resource: Resource = Resource.deserialize(resource_details.attributes) diff --git a/src/inmanta/agent/write_barier_executor.py b/src/inmanta/agent/write_barier_executor.py index 57adde9fdd..801ab25ede 100644 --- a/src/inmanta/agent/write_barier_executor.py +++ b/src/inmanta/agent/write_barier_executor.py @@ -19,6 +19,7 @@ import concurrent import typing import uuid +from collections.abc import Mapping from copy import deepcopy from inmanta import const @@ -39,7 +40,7 @@ async def execute( gid: uuid.UUID, resource_details: ResourceDetails, reason: str, - requires: dict[ResourceIdStr, const.ResourceState], + requires: Mapping[ResourceIdStr, const.ResourceState], ) -> DeployResult: return await self.delegate.execute( action_id, diff --git a/src/inmanta/deploy/persistence.py b/src/inmanta/deploy/persistence.py index d2809c2db9..09ae6755b6 100644 --- a/src/inmanta/deploy/persistence.py +++ b/src/inmanta/deploy/persistence.py @@ -27,7 +27,7 @@ from inmanta import const, data from inmanta.agent.executor import DeployResult, DryrunResult, FactResult from inmanta.const import TERMINAL_STATES, TRANSIENT_STATES, VALID_STATES_ON_STATE_UPDATE, Change, ResourceState -from inmanta.data.model import AttributeStateChange, ResourceIdStr, ResourceVersionIdStr +from inmanta.data.model import AttributeStateChange, ResourceVersionIdStr from inmanta.protocol import Client from inmanta.protocol.exceptions import BadRequest, Conflict, NotFound from inmanta.resources import Id @@ -44,9 +44,7 @@ class StateUpdateManager(abc.ABC): """ @abc.abstractmethod - async def send_in_progress( - self, action_id: UUID, resource_id: ResourceVersionIdStr - ) -> dict[ResourceIdStr, const.ResourceState]: + async def send_in_progress(self, action_id: UUID, resource_id: ResourceVersionIdStr) -> None: # FIXME: get rid of version in the id pass @@ -72,9 +70,7 @@ def __init__(self, client: Client, environment: UUID) -> None: self.client = client self.environment = environment - async def send_in_progress( - self, action_id: UUID, resource_id: ResourceVersionIdStr - ) -> dict[ResourceIdStr, const.ResourceState]: + async def send_in_progress(self, action_id: UUID, resource_id: ResourceVersionIdStr) -> None: result = await self.client.resource_deploy_start( tid=self.environment, rvid=resource_id, @@ -82,7 +78,6 @@ async def send_in_progress( ) if result.code != 200 or result.result is None: raise Exception("Failed to report the start of the deployment to the server") - return {Id.parse_id(key).resource_str(): const.ResourceState[value] for key, value in result.result["data"].items()} async def send_deploy_done(self, result: DeployResult) -> None: changes: dict[ResourceVersionIdStr, dict[str, AttributeStateChange]] = {result.rvid: result.changes} @@ -153,9 +148,7 @@ def log_resource_action(self, env: UUID, resource_id: str, log_level: int, ts: d log_record = resourceservice.ResourceActionLogLine(logger.name, log_level, message, ts) logger.handle(log_record) - async def send_in_progress( - self, action_id: UUID, resource_id: ResourceVersionIdStr - ) -> dict[ResourceIdStr, const.ResourceState]: + async def send_in_progress(self, action_id: UUID, resource_id: ResourceVersionIdStr) -> None: resource_id_str = resource_id resource_id_parsed = Id.parse_id(resource_id_str) @@ -197,13 +190,6 @@ async def send_in_progress( # FIXME: we may want to have this in the RPS table instead of Resource table, at some point await resource.update_fields(connection=connection, status=const.ResourceState.deploying) - # FIXME: shortcut to use scheduler state - result = await data.Resource.get_last_non_deploying_state_for_dependencies( - environment=self.environment, resource_version_id=resource_id_parsed, connection=connection - ) - - return {Id.parse_id(key).resource_str(): const.ResourceState[value] for key, value in result.items()} - async def send_deploy_done(self, result: DeployResult) -> None: def error_and_log(message: str, **context: Any) -> None: """ diff --git a/src/inmanta/deploy/scheduler.py b/src/inmanta/deploy/scheduler.py index 116a4962b6..3bd99ffed8 100644 --- a/src/inmanta/deploy/scheduler.py +++ b/src/inmanta/deploy/scheduler.py @@ -23,6 +23,7 @@ import uuid from abc import abstractmethod from collections.abc import Collection, Mapping, Set +from dataclasses import dataclass from typing import Optional from uuid import UUID @@ -36,7 +37,15 @@ from inmanta.data.model import ResourceIdStr, ResourceType, ResourceVersionIdStr from inmanta.deploy import work from inmanta.deploy.persistence import StateUpdateManager, ToDbUpdateManager -from inmanta.deploy.state import AgentStatus, DeploymentResult, ModelState, ResourceDetails, ResourceState, ResourceStatus +from inmanta.deploy.state import ( + AgentStatus, + BlockedStatus, + DeploymentResult, + ModelState, + ResourceDetails, + ResourceState, + ResourceStatus, +) from inmanta.deploy.tasks import Deploy, DryRun, RefreshFact from inmanta.deploy.work import PrioritizedTask, TaskPriority from inmanta.protocol import Client @@ -45,6 +54,13 @@ LOGGER = logging.getLogger(__name__) +@dataclass(frozen=True) +class ResourceIntent: + model_version: int + details: ResourceDetails + dependencies: Optional[Mapping[ResourceIdStr, const.ResourceState]] + + class TaskManager(StateUpdateManager, abc.ABC): """ Interface for communication with tasks (deploy.task.Task). Offers methods to inspect intent and to report task results. @@ -65,15 +81,22 @@ def get_types_for_agent(self, agent: str) -> Collection[ResourceType]: async def get_resource_intent( self, resource: ResourceIdStr, - *, - for_deploy: bool = False, - ) -> Optional[tuple[int, ResourceDetails]]: + ) -> Optional[ResourceIntent]: """ Returns the current version and the details for the given resource, or None if it is not (anymore) managed by the scheduler. - :param for_deploy: True iff the task will start deploying this intent. If set, must call report_resource_state later - with deployment result. + Acquires appropriate locks. + """ + + @abstractmethod + async def get_resource_intent_for_deploy( + self, + resource: ResourceIdStr, + ) -> Optional[ResourceIntent]: + """ + Returns the current version details for the given resource along with the last non-deploying state for its dependencies, + or None if it is not (anymore) managed by the scheduler. Acquires appropriate locks. """ @@ -572,24 +595,37 @@ async def is_agent_running(self, name: str) -> bool: # TaskManager interface - async def get_resource_intent( - self, resource: ResourceIdStr, *, for_deploy: bool = False - ) -> Optional[tuple[int, ResourceDetails]]: + def _get_resource_intent(self, resource: ResourceIdStr) -> Optional[ResourceDetails]: + """ + Get intent of a given resource. + Always expected to be called under lock + """ + try: + return self._state.resources[resource] + except KeyError: + # Stale resource + # May occur in rare races between new_version and acquiring the lock we're under here. This race is safe + # because of this check, and an intrinsic part of the locking design because it's preferred over wider + # locking for performance reasons. + return None + + async def get_resource_intent(self, resource: ResourceIdStr) -> Optional[ResourceIntent]: async with self._scheduler_lock: # fetch resource details under lock - try: - result = self._state.version, self._state.resources[resource] - except KeyError: - # Stale resource - # May occur in rare races between new_version and acquiring the lock we're under here. This race is safe - # because of this check, and an intrinsic part of the locking design because it's preferred over wider - # locking for performance reasons. + resource_details = self._get_resource_intent(resource) + if resource_details is None: return None - else: - if for_deploy: - # still under lock => can safely add to non-stale in-progress set - self._deploying_latest.add(resource) - return result + return ResourceIntent(model_version=self._state.version, details=resource_details, dependencies=None) + + async def get_resource_intent_for_deploy(self, resource: ResourceIdStr) -> Optional[ResourceIntent]: + async with self._scheduler_lock: + # fetch resource details under lock + resource_details = self._get_resource_intent(resource) + if resource_details is None: + return None + dependencies = await self._get_last_non_deploying_state_for_dependencies(resource=resource) + self._deploying_latest.add(resource) + return ResourceIntent(model_version=self._state.version, details=resource_details, dependencies=dependencies) async def report_resource_state( self, @@ -647,13 +683,44 @@ async def report_resource_state( deploying=set(), ) + async def _get_last_non_deploying_state_for_dependencies( + self, resource: ResourceIdStr + ) -> Mapping[ResourceIdStr, const.ResourceState]: + """ + Get resource state for every dependency of a given resource from the scheduler state. + The state is then converted to const.ResourceState. + + Should only be called under scheduler lock. + + :param resource: The id of the resource to find the dependencies for + """ + requires_view: Mapping[ResourceIdStr, Set[ResourceIdStr]] = self._state.requires.requires_view() + dependencies: Set[ResourceIdStr] = requires_view.get(resource, set()) + dependencies_state = {} + for dep_id in dependencies: + resource_state_object: ResourceState = self._state.resource_state[dep_id] + match resource_state_object: + case ResourceState(status=ResourceStatus.UNDEFINED): + dependencies_state[dep_id] = const.ResourceState.undefined + case ResourceState(blocked=BlockedStatus.YES): + dependencies_state[dep_id] = const.ResourceState.skipped_for_undefined + case ResourceState(status=ResourceStatus.HAS_UPDATE): + dependencies_state[dep_id] = const.ResourceState.available + case ResourceState(deployment_result=DeploymentResult.SKIPPED): + dependencies_state[dep_id] = const.ResourceState.skipped + case ResourceState(deployment_result=DeploymentResult.DEPLOYED): + dependencies_state[dep_id] = const.ResourceState.deployed + case ResourceState(deployment_result=DeploymentResult.FAILED): + dependencies_state[dep_id] = const.ResourceState.failed + case _: + raise Exception(f"Failed to parse the resource state for {dep_id}: {resource_state_object}") + return dependencies_state + def get_types_for_agent(self, agent: str) -> Collection[ResourceType]: return list(self._state.types_per_agent[agent]) - async def send_in_progress( - self, action_id: UUID, resource_id: ResourceVersionIdStr - ) -> dict[ResourceIdStr, const.ResourceState]: - return await self._state_update_delegate.send_in_progress(action_id, resource_id) + async def send_in_progress(self, action_id: UUID, resource_id: ResourceVersionIdStr) -> None: + await self._state_update_delegate.send_in_progress(action_id, resource_id) async def send_deploy_done(self, result: DeployResult) -> None: return await self._state_update_delegate.send_deploy_done(result) diff --git a/src/inmanta/deploy/tasks.py b/src/inmanta/deploy/tasks.py index 7c3f54101a..077a5b3d2b 100644 --- a/src/inmanta/deploy/tasks.py +++ b/src/inmanta/deploy/tasks.py @@ -116,21 +116,24 @@ async def execute(self, task_manager: "scheduler.TaskManager", agent: str, reaso # First do scheduler book keeping to establish what to do version: int resource_details: "state.ResourceDetails" - intent = await task_manager.get_resource_intent(self.resource, for_deploy=True) + intent = await task_manager.get_resource_intent_for_deploy(self.resource) if intent is None: # Stale resource, can simply be dropped. return - # Resolve to exector form - version, resource_details = intent - executor_resource_details: executor.ResourceDetails = self.get_executor_resource_details(version, resource_details) + # Dependencies are always set when calling get_resource_intent_for_deploy + assert intent.dependencies is not None + # Resolve to exector form + version = intent.model_version + resource_details = intent.details + executor_resource_details: executor.ResourceDetails = self.get_executor_resource_details(version, resource_details) - # Make id's - gid = uuid.uuid4() - action_id = uuid.uuid4() # can this be gid ? + # Make id's + gid = uuid.uuid4() + action_id = uuid.uuid4() # can this be gid ? - # The main difficulty off this code is exception handling - # We collect state here to report back in the finally block + # The main difficulty off this code is exception handling + # We collect state here to report back in the finally block # Full status of the deploy, # may be unset if we fail before signaling start to the server, will be set if we signaled start @@ -142,9 +145,7 @@ async def execute(self, task_manager: "scheduler.TaskManager", agent: str, reaso # Signal start to server try: - requires: dict[ResourceIdStr, const.ResourceState] = await task_manager.send_in_progress( - action_id, executor_resource_details.rvid - ) + await task_manager.send_in_progress(action_id, executor_resource_details.rvid) except Exception: # Unrecoverable, can't reach server scheduler_deployment_result = state.DeploymentResult.FAILED @@ -181,7 +182,9 @@ async def execute(self, task_manager: "scheduler.TaskManager", agent: str, reaso assert reason is not None # Should always be set for deploy # Deploy try: - deploy_result = await my_executor.execute(action_id, gid, executor_resource_details, reason, requires) + deploy_result = await my_executor.execute( + action_id, gid, executor_resource_details, reason, intent.dependencies + ) # Translate deploy result status to the new deployment result state match deploy_result.status: case const.ResourceState.deployed: @@ -282,7 +285,8 @@ async def execute(self, task_manager: "scheduler.TaskManager", agent: str, reaso # Stale resource, can simply be dropped. return # FIXME, should not need resource details, only id, see related FIXME on executor side - version, resource_details = intent + version = intent.model_version + resource_details = intent.details executor_resource_details: executor.ResourceDetails = self.get_executor_resource_details(version, resource_details) try: diff --git a/tests/deploy/test_scheduler_agent.py b/tests/deploy/test_scheduler_agent.py index 5e29addfc0..87dc3fe1bc 100644 --- a/tests/deploy/test_scheduler_agent.py +++ b/tests/deploy/test_scheduler_agent.py @@ -230,9 +230,7 @@ class DummyStateManager(StateUpdateManager): def __init__(self): self.state: dict[ResourceIdStr, const.ResourceState] = {} - async def send_in_progress( - self, action_id: UUID, resource_id: ResourceVersionIdStr - ) -> dict[ResourceIdStr, const.ResourceState]: + async def send_in_progress(self, action_id: UUID, resource_id: ResourceVersionIdStr) -> None: self.state[Id.parse_id(resource_id).resource_str()] = const.ResourceState.deploying async def send_deploy_done(self, result: DeployResult) -> None: