Skip to content

Commit

Permalink
Read resource state from scheduler instead of reading from DB (Issue #…
Browse files Browse the repository at this point in the history
…8253, PR #8301)

# Description

Read resource state from scheduler instead of from DB.

closes #8253

# Self Check:

Strike through any lines that are not applicable (`~~line~~`) then check the box

- [x] Attached issue to pull request
- [x] Changelog entry
- [x] Type annotations are present
- [x] Code is clear and sufficiently documented
- [x] No (preventable) type errors (check using make mypy or make mypy-diff)
- [ ] Sufficient test cases (reproduces the bug/tests the requested feature)
- [x] Correct, in line with design
- [ ] End user documentation is included or an issue is created for end-user documentation (add ref to issue here: )
- [ ] If this PR fixes a race condition in the test suite, also push the fix to the relevant stable branche(s) (see [test-fixes](https://internal.inmanta.com/development/core/tasks/build-master.html#test-fixes) for more info)
  • Loading branch information
jptrindade authored and inmantaci committed Nov 14, 2024
1 parent 4091da7 commit 5ade094
Show file tree
Hide file tree
Showing 9 changed files with 129 additions and 66 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
description: Read resource state from scheduler instead of reading from DB
issue-nr: 8253
change-type: minor
destination-branches: [master]

2 changes: 1 addition & 1 deletion src/inmanta/agent/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -537,7 +537,7 @@ async def execute(
gid: uuid.UUID,
resource_details: ResourceDetails,
reason: str,
requires: dict[ResourceIdStr, const.ResourceState],
requires: Mapping[ResourceIdStr, const.ResourceState],
) -> DeployResult:
"""
Perform the actual deployment of the resource by calling the loaded handler code
Expand Down
5 changes: 3 additions & 2 deletions src/inmanta/agent/forking_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
import typing
import uuid
from asyncio import Future, transports
from collections.abc import Mapping
from concurrent.futures import ThreadPoolExecutor
from typing import Awaitable

Expand Down Expand Up @@ -491,7 +492,7 @@ def __init__(
gid: uuid.UUID,
resource_details: "inmanta.agent.executor.ResourceDetails",
reason: str,
requires: dict[ResourceIdStr, const.ResourceState],
requires: Mapping[ResourceIdStr, const.ResourceState],
) -> None:
self.agent_name = agent_name
self.gid = gid
Expand Down Expand Up @@ -811,7 +812,7 @@ async def execute(
gid: uuid.UUID,
resource_details: "inmanta.agent.executor.ResourceDetails",
reason: str,
requires: dict[ResourceIdStr, const.ResourceState],
requires: Mapping[ResourceIdStr, const.ResourceState],
) -> DeployResult:
return await self.call(ExecuteCommand(self.id.agent_name, action_id, gid, resource_details, reason, requires))

Expand Down
5 changes: 3 additions & 2 deletions src/inmanta/agent/in_process_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import uuid
from asyncio import InvalidStateError, Lock
from collections import defaultdict
from collections.abc import Mapping
from concurrent.futures.thread import ThreadPoolExecutor
from typing import Any, Optional

Expand Down Expand Up @@ -141,7 +142,7 @@ async def _execute(
resource: Resource,
gid: uuid.UUID,
ctx: handler.HandlerContext,
requires: dict[ResourceIdStr, const.ResourceState],
requires: Mapping[ResourceIdStr, const.ResourceState],
) -> None:
"""
Get the handler for a given resource and run its ``deploy`` method.
Expand Down Expand Up @@ -220,7 +221,7 @@ async def execute(
gid: uuid.UUID,
resource_details: ResourceDetails,
reason: str,
requires: dict[ResourceIdStr, const.ResourceState],
requires: Mapping[ResourceIdStr, const.ResourceState],
) -> DeployResult:
try:
resource: Resource = Resource.deserialize(resource_details.attributes)
Expand Down
3 changes: 2 additions & 1 deletion src/inmanta/agent/write_barier_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import concurrent
import typing
import uuid
from collections.abc import Mapping
from copy import deepcopy

from inmanta import const
Expand All @@ -39,7 +40,7 @@ async def execute(
gid: uuid.UUID,
resource_details: ResourceDetails,
reason: str,
requires: dict[ResourceIdStr, const.ResourceState],
requires: Mapping[ResourceIdStr, const.ResourceState],
) -> DeployResult:
return await self.delegate.execute(
action_id,
Expand Down
22 changes: 4 additions & 18 deletions src/inmanta/deploy/persistence.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from inmanta import const, data
from inmanta.agent.executor import DeployResult, DryrunResult, FactResult
from inmanta.const import TERMINAL_STATES, TRANSIENT_STATES, VALID_STATES_ON_STATE_UPDATE, Change, ResourceState
from inmanta.data.model import AttributeStateChange, ResourceIdStr, ResourceVersionIdStr
from inmanta.data.model import AttributeStateChange, ResourceVersionIdStr
from inmanta.protocol import Client
from inmanta.protocol.exceptions import BadRequest, Conflict, NotFound
from inmanta.resources import Id
Expand All @@ -44,9 +44,7 @@ class StateUpdateManager(abc.ABC):
"""

@abc.abstractmethod
async def send_in_progress(
self, action_id: UUID, resource_id: ResourceVersionIdStr
) -> dict[ResourceIdStr, const.ResourceState]:
async def send_in_progress(self, action_id: UUID, resource_id: ResourceVersionIdStr) -> None:
# FIXME: get rid of version in the id
pass

Expand All @@ -72,17 +70,14 @@ def __init__(self, client: Client, environment: UUID) -> None:
self.client = client
self.environment = environment

async def send_in_progress(
self, action_id: UUID, resource_id: ResourceVersionIdStr
) -> dict[ResourceIdStr, const.ResourceState]:
async def send_in_progress(self, action_id: UUID, resource_id: ResourceVersionIdStr) -> None:
result = await self.client.resource_deploy_start(
tid=self.environment,
rvid=resource_id,
action_id=action_id,
)
if result.code != 200 or result.result is None:
raise Exception("Failed to report the start of the deployment to the server")
return {Id.parse_id(key).resource_str(): const.ResourceState[value] for key, value in result.result["data"].items()}

async def send_deploy_done(self, result: DeployResult) -> None:
changes: dict[ResourceVersionIdStr, dict[str, AttributeStateChange]] = {result.rvid: result.changes}
Expand Down Expand Up @@ -153,9 +148,7 @@ def log_resource_action(self, env: UUID, resource_id: str, log_level: int, ts: d
log_record = resourceservice.ResourceActionLogLine(logger.name, log_level, message, ts)
logger.handle(log_record)

async def send_in_progress(
self, action_id: UUID, resource_id: ResourceVersionIdStr
) -> dict[ResourceIdStr, const.ResourceState]:
async def send_in_progress(self, action_id: UUID, resource_id: ResourceVersionIdStr) -> None:
resource_id_str = resource_id
resource_id_parsed = Id.parse_id(resource_id_str)

Expand Down Expand Up @@ -197,13 +190,6 @@ async def send_in_progress(
# FIXME: we may want to have this in the RPS table instead of Resource table, at some point
await resource.update_fields(connection=connection, status=const.ResourceState.deploying)

# FIXME: shortcut to use scheduler state
result = await data.Resource.get_last_non_deploying_state_for_dependencies(
environment=self.environment, resource_version_id=resource_id_parsed, connection=connection
)

return {Id.parse_id(key).resource_str(): const.ResourceState[value] for key, value in result.items()}

async def send_deploy_done(self, result: DeployResult) -> None:
def error_and_log(message: str, **context: Any) -> None:
"""
Expand Down
117 changes: 92 additions & 25 deletions src/inmanta/deploy/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import uuid
from abc import abstractmethod
from collections.abc import Collection, Mapping, Set
from dataclasses import dataclass
from typing import Optional
from uuid import UUID

Expand All @@ -36,7 +37,15 @@
from inmanta.data.model import ResourceIdStr, ResourceType, ResourceVersionIdStr
from inmanta.deploy import work
from inmanta.deploy.persistence import StateUpdateManager, ToDbUpdateManager
from inmanta.deploy.state import AgentStatus, DeploymentResult, ModelState, ResourceDetails, ResourceState, ResourceStatus
from inmanta.deploy.state import (
AgentStatus,
BlockedStatus,
DeploymentResult,
ModelState,
ResourceDetails,
ResourceState,
ResourceStatus,
)
from inmanta.deploy.tasks import Deploy, DryRun, RefreshFact
from inmanta.deploy.work import PrioritizedTask, TaskPriority
from inmanta.protocol import Client
Expand All @@ -45,6 +54,13 @@
LOGGER = logging.getLogger(__name__)


@dataclass(frozen=True)
class ResourceIntent:
model_version: int
details: ResourceDetails
dependencies: Optional[Mapping[ResourceIdStr, const.ResourceState]]


class TaskManager(StateUpdateManager, abc.ABC):
"""
Interface for communication with tasks (deploy.task.Task). Offers methods to inspect intent and to report task results.
Expand All @@ -65,15 +81,22 @@ def get_types_for_agent(self, agent: str) -> Collection[ResourceType]:
async def get_resource_intent(
self,
resource: ResourceIdStr,
*,
for_deploy: bool = False,
) -> Optional[tuple[int, ResourceDetails]]:
) -> Optional[ResourceIntent]:
"""
Returns the current version and the details for the given resource, or None if it is not (anymore) managed by the
scheduler.
:param for_deploy: True iff the task will start deploying this intent. If set, must call report_resource_state later
with deployment result.
Acquires appropriate locks.
"""

@abstractmethod
async def get_resource_intent_for_deploy(
self,
resource: ResourceIdStr,
) -> Optional[ResourceIntent]:
"""
Returns the current version details for the given resource along with the last non-deploying state for its dependencies,
or None if it is not (anymore) managed by the scheduler.
Acquires appropriate locks.
"""
Expand Down Expand Up @@ -572,24 +595,37 @@ async def is_agent_running(self, name: str) -> bool:

# TaskManager interface

async def get_resource_intent(
self, resource: ResourceIdStr, *, for_deploy: bool = False
) -> Optional[tuple[int, ResourceDetails]]:
def _get_resource_intent(self, resource: ResourceIdStr) -> Optional[ResourceDetails]:
"""
Get intent of a given resource.
Always expected to be called under lock
"""
try:
return self._state.resources[resource]
except KeyError:
# Stale resource
# May occur in rare races between new_version and acquiring the lock we're under here. This race is safe
# because of this check, and an intrinsic part of the locking design because it's preferred over wider
# locking for performance reasons.
return None

async def get_resource_intent(self, resource: ResourceIdStr) -> Optional[ResourceIntent]:
async with self._scheduler_lock:
# fetch resource details under lock
try:
result = self._state.version, self._state.resources[resource]
except KeyError:
# Stale resource
# May occur in rare races between new_version and acquiring the lock we're under here. This race is safe
# because of this check, and an intrinsic part of the locking design because it's preferred over wider
# locking for performance reasons.
resource_details = self._get_resource_intent(resource)
if resource_details is None:
return None
else:
if for_deploy:
# still under lock => can safely add to non-stale in-progress set
self._deploying_latest.add(resource)
return result
return ResourceIntent(model_version=self._state.version, details=resource_details, dependencies=None)

async def get_resource_intent_for_deploy(self, resource: ResourceIdStr) -> Optional[ResourceIntent]:
async with self._scheduler_lock:
# fetch resource details under lock
resource_details = self._get_resource_intent(resource)
if resource_details is None:
return None
dependencies = await self._get_last_non_deploying_state_for_dependencies(resource=resource)
self._deploying_latest.add(resource)
return ResourceIntent(model_version=self._state.version, details=resource_details, dependencies=dependencies)

async def report_resource_state(
self,
Expand Down Expand Up @@ -647,13 +683,44 @@ async def report_resource_state(
deploying=set(),
)

async def _get_last_non_deploying_state_for_dependencies(
self, resource: ResourceIdStr
) -> Mapping[ResourceIdStr, const.ResourceState]:
"""
Get resource state for every dependency of a given resource from the scheduler state.
The state is then converted to const.ResourceState.
Should only be called under scheduler lock.
:param resource: The id of the resource to find the dependencies for
"""
requires_view: Mapping[ResourceIdStr, Set[ResourceIdStr]] = self._state.requires.requires_view()
dependencies: Set[ResourceIdStr] = requires_view.get(resource, set())
dependencies_state = {}
for dep_id in dependencies:
resource_state_object: ResourceState = self._state.resource_state[dep_id]
match resource_state_object:
case ResourceState(status=ResourceStatus.UNDEFINED):
dependencies_state[dep_id] = const.ResourceState.undefined
case ResourceState(blocked=BlockedStatus.YES):
dependencies_state[dep_id] = const.ResourceState.skipped_for_undefined
case ResourceState(status=ResourceStatus.HAS_UPDATE):
dependencies_state[dep_id] = const.ResourceState.available
case ResourceState(deployment_result=DeploymentResult.SKIPPED):
dependencies_state[dep_id] = const.ResourceState.skipped
case ResourceState(deployment_result=DeploymentResult.DEPLOYED):
dependencies_state[dep_id] = const.ResourceState.deployed
case ResourceState(deployment_result=DeploymentResult.FAILED):
dependencies_state[dep_id] = const.ResourceState.failed
case _:
raise Exception(f"Failed to parse the resource state for {dep_id}: {resource_state_object}")
return dependencies_state

def get_types_for_agent(self, agent: str) -> Collection[ResourceType]:
return list(self._state.types_per_agent[agent])

async def send_in_progress(
self, action_id: UUID, resource_id: ResourceVersionIdStr
) -> dict[ResourceIdStr, const.ResourceState]:
return await self._state_update_delegate.send_in_progress(action_id, resource_id)
async def send_in_progress(self, action_id: UUID, resource_id: ResourceVersionIdStr) -> None:
await self._state_update_delegate.send_in_progress(action_id, resource_id)

async def send_deploy_done(self, result: DeployResult) -> None:
return await self._state_update_delegate.send_deploy_done(result)
Expand Down
32 changes: 18 additions & 14 deletions src/inmanta/deploy/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,21 +116,24 @@ async def execute(self, task_manager: "scheduler.TaskManager", agent: str, reaso
# First do scheduler book keeping to establish what to do
version: int
resource_details: "state.ResourceDetails"
intent = await task_manager.get_resource_intent(self.resource, for_deploy=True)
intent = await task_manager.get_resource_intent_for_deploy(self.resource)
if intent is None:
# Stale resource, can simply be dropped.
return

# Resolve to exector form
version, resource_details = intent
executor_resource_details: executor.ResourceDetails = self.get_executor_resource_details(version, resource_details)
# Dependencies are always set when calling get_resource_intent_for_deploy
assert intent.dependencies is not None
# Resolve to exector form
version = intent.model_version
resource_details = intent.details
executor_resource_details: executor.ResourceDetails = self.get_executor_resource_details(version, resource_details)

# Make id's
gid = uuid.uuid4()
action_id = uuid.uuid4() # can this be gid ?
# Make id's
gid = uuid.uuid4()
action_id = uuid.uuid4() # can this be gid ?

# The main difficulty off this code is exception handling
# We collect state here to report back in the finally block
# The main difficulty off this code is exception handling
# We collect state here to report back in the finally block

# Full status of the deploy,
# may be unset if we fail before signaling start to the server, will be set if we signaled start
Expand All @@ -142,9 +145,7 @@ async def execute(self, task_manager: "scheduler.TaskManager", agent: str, reaso

# Signal start to server
try:
requires: dict[ResourceIdStr, const.ResourceState] = await task_manager.send_in_progress(
action_id, executor_resource_details.rvid
)
await task_manager.send_in_progress(action_id, executor_resource_details.rvid)
except Exception:
# Unrecoverable, can't reach server
scheduler_deployment_result = state.DeploymentResult.FAILED
Expand Down Expand Up @@ -181,7 +182,9 @@ async def execute(self, task_manager: "scheduler.TaskManager", agent: str, reaso
assert reason is not None # Should always be set for deploy
# Deploy
try:
deploy_result = await my_executor.execute(action_id, gid, executor_resource_details, reason, requires)
deploy_result = await my_executor.execute(
action_id, gid, executor_resource_details, reason, intent.dependencies
)
# Translate deploy result status to the new deployment result state
match deploy_result.status:
case const.ResourceState.deployed:
Expand Down Expand Up @@ -282,7 +285,8 @@ async def execute(self, task_manager: "scheduler.TaskManager", agent: str, reaso
# Stale resource, can simply be dropped.
return
# FIXME, should not need resource details, only id, see related FIXME on executor side
version, resource_details = intent
version = intent.model_version
resource_details = intent.details

executor_resource_details: executor.ResourceDetails = self.get_executor_resource_details(version, resource_details)
try:
Expand Down
4 changes: 1 addition & 3 deletions tests/deploy/test_scheduler_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,9 +230,7 @@ class DummyStateManager(StateUpdateManager):
def __init__(self):
self.state: dict[ResourceIdStr, const.ResourceState] = {}

async def send_in_progress(
self, action_id: UUID, resource_id: ResourceVersionIdStr
) -> dict[ResourceIdStr, const.ResourceState]:
async def send_in_progress(self, action_id: UUID, resource_id: ResourceVersionIdStr) -> None:
self.state[Id.parse_id(resource_id).resource_str()] = const.ResourceState.deploying

async def send_deploy_done(self, result: DeployResult) -> None:
Expand Down

0 comments on commit 5ade094

Please sign in to comment.