Skip to content

Commit

Permalink
store delegated operation IO in metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
imanjra committed Jun 4, 2024
1 parent 7d92e03 commit d7421c0
Show file tree
Hide file tree
Showing 6 changed files with 125 additions and 13 deletions.
31 changes: 24 additions & 7 deletions fiftyone/factory/repos/delegated_operation.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
| `voxel51.com <https://voxel51.com/>`_
|
"""

import logging
from datetime import datetime
from typing import Any, List
Expand Down Expand Up @@ -43,6 +44,7 @@ def update_run_state(
result: ExecutionResult = None,
run_link: str = None,
progress: ExecutionProgress = None,
outputs_schema: dict = None,
) -> DelegatedOperationDocument:
"""Update the run state of an operation."""
raise NotImplementedError("subclass must implement update_run_state()")
Expand Down Expand Up @@ -163,6 +165,11 @@ def queue_operation(self, **kwargs: Any) -> DelegatedOperationDocument:
if delegation_target:
setattr(op, "delegation_target", delegation_target)

# also set the metadata (not required)
metadata = kwargs.get("metadata", None)
if metadata:
setattr(op, "metadata", metadata)

context = None
if isinstance(op.context, dict):
context = ExecutionContext(
Expand Down Expand Up @@ -221,6 +228,7 @@ def update_run_state(
result: ExecutionResult = None,
run_link: str = None,
progress: ExecutionProgress = None,
outputs_schema: dict = None,
) -> DelegatedOperationDocument:
update = None

Expand All @@ -234,20 +242,29 @@ def update_run_state(
"run_state": run_state,
"completed_at": datetime.utcnow(),
"updated_at": datetime.utcnow(),
"result": execution_result.to_json()
if execution_result
else None,
"result": (
execution_result.to_json()
if execution_result
else None
),
}
}
if outputs_schema:
update["$set"]["metadata.outputs_schema"] = {
"$ifNull": [outputs_schema, {}]
}

elif run_state == ExecutionRunState.FAILED:
update = {
"$set": {
"run_state": run_state,
"failed_at": datetime.utcnow(),
"updated_at": datetime.utcnow(),
"result": execution_result.to_json()
if execution_result
else None,
"result": (
execution_result.to_json()
if execution_result
else None
),
}
}
elif run_state == ExecutionRunState.RUNNING:
Expand All @@ -271,7 +288,7 @@ def update_run_state(

doc = self._collection.find_one_and_update(
filter={"_id": _id},
update=update,
update=[update],
return_document=pymongo.ReturnDocument.AFTER,
)

Expand Down
4 changes: 4 additions & 0 deletions fiftyone/factory/repos/delegated_operation_doc.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
| `voxel51.com <https://voxel51.com/>`_
|
"""

import logging
from datetime import datetime

Expand Down Expand Up @@ -48,6 +49,7 @@ def __init__(
self.result = None
self.id = None
self._doc = None
self.metadata = None

def from_pymongo(self, doc: dict):
# required fields
Expand Down Expand Up @@ -102,6 +104,8 @@ def from_pymongo(self, doc: dict):
self.id = doc["_id"]
self._doc = doc

self.metadata = doc["metadata"] if "metadata" in doc else None

return self

def to_pymongo(self) -> dict:
Expand Down
31 changes: 29 additions & 2 deletions fiftyone/operators/delegated.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
| `voxel51.com <https://voxel51.com/>`_
|
"""

import asyncio
import logging
import traceback
Expand All @@ -16,6 +17,7 @@
do_execute_operator,
ExecutionResult,
ExecutionRunState,
resolve_type_with_context,
)


Expand All @@ -32,7 +34,12 @@ def __init__(self, repo=None):
self._repo = repo

def queue_operation(
self, operator, label=None, delegation_target=None, context=None
self,
operator,
label=None,
delegation_target=None,
context=None,
metadata=None,
):
"""Queues the given delegated operation for execution.
Expand All @@ -43,6 +50,10 @@ def queue_operation(
the operator if not supplied)
context (None): an
:class:`fiftyone.operators.executor.ExecutionContext`
metadata (None): an optional metadata dict containing properties below:
- inputs_schema: the schema of the operator's inputs
- outputs_schema: the schema of the operator's outputs
Returns:
a :class:`fiftyone.factory.repos.DelegatedOperationDocument`
Expand All @@ -52,6 +63,7 @@ def queue_operation(
label=label if label else operator,
delegation_target=delegation_target,
context=context,
metadata=metadata,
)

def set_progress(self, doc_id, progress):
Expand Down Expand Up @@ -112,12 +124,27 @@ def set_completed(
Returns:
a :class:`fiftyone.factory.repos.DelegatedOperationDocument`
"""

outputs_schema = None
try:
doc = self._repo.get(_id=doc_id)
outputs = asyncio.run(
resolve_type_with_context(doc.context, "outputs")
)
outputs_schema = outputs.to_json()
except:
logger.warning(
"Failed to resolve output schema for the operation."
+ traceback.format_exc(),
)

return self._repo.update_run_state(
_id=doc_id,
run_state=ExecutionRunState.COMPLETED,
result=result,
progress=progress,
run_link=run_link,
outputs_schema=outputs_schema,
)

def set_failed(
Expand Down Expand Up @@ -382,5 +409,5 @@ async def _execute_operator(self, doc):
if isinstance(prepared, ExecutionResult):
raise prepared.to_exception()

operator, _, ctx = prepared
operator, _, ctx, __ = prepared
return await do_execute_operator(operator, ctx, exhaust=True)
32 changes: 30 additions & 2 deletions fiftyone/operators/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ async def execute_or_delegate_operator(
if isinstance(prepared, ExecutionResult):
raise prepared.to_exception()
else:
operator, executor, ctx = prepared
operator, executor, ctx, inputs = prepared

execution_options = operator.resolve_execution_options(ctx)
if (
Expand Down Expand Up @@ -252,11 +252,21 @@ async def execute_or_delegate_operator(
try:
from .delegated import DelegatedOperationService

metadata = {"inputs_schema": None, "outputs_schema": None}

try:
metadata["inputs_schema"] = inputs.to_json()
except Exception as e:
logger.warning(
f"Failed to resolve inputs schema for the operation: {str(e)}"
)

op = DelegatedOperationService().queue_operation(
operator=operator.uri,
context=ctx.serialize(),
delegation_target=ctx.delegation_target,
label=operator.name,
metadata=metadata,
)

execution = ExecutionResult(
Expand Down Expand Up @@ -312,7 +322,7 @@ async def prepare_operator_executor(
error="Validation error", validation_ctx=validation_ctx
)

return operator, executor, ctx
return operator, executor, ctx, inputs


async def do_execute_operator(operator, ctx, exhaust=False):
Expand Down Expand Up @@ -366,6 +376,24 @@ async def resolve_type(registry, operator_uri, request_params):
return ExecutionResult(error=traceback.format_exc())


async def resolve_type_with_context(context, target: str = None):
"""Resolves the "inputs" or "outputs" schema of an operator with the given context.
Args:
context: the :class:`ExecutionContext` of an operator
target (None): the target schema ("inputs" or "outputs")
Returns:
the schema of "inputs" or "outputs" :class:`fiftyone.operators.types.Property` of
an operator, or None
"""
computed_target = target or context.request_params.get("target", None)
request_params = {**context.request_params, "target": computed_target}
operator_uri = request_params.get("operator_uri", None)
registry = OperatorRegistry()
return await resolve_type(registry, operator_uri, request_params)


async def resolve_execution_options(registry, operator_uri, request_params):
"""Resolves the execution options of the operator with the given name.
Expand Down
1 change: 0 additions & 1 deletion fiftyone/operators/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
resolve_type,
resolve_placement,
resolve_execution_options,
ExecutionContext,
)
from .message import GeneratedMessage
from .permissions import PermissionedOperatorRegistry
Expand Down
39 changes: 38 additions & 1 deletion tests/unittests/delegated_operators_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
| `voxel51.com <https://voxel51.com/>`_
|
"""

import time
import unittest
from unittest import mock
Expand All @@ -26,7 +27,6 @@
ExecutionContext,
ExecutionResult,
ExecutionRunState,
ExecutionProgress,
)
from fiftyone.operators.operator import Operator, OperatorConfig

Expand All @@ -44,6 +44,16 @@ def delete(self):
pass


class MockInputs:
def to_json(self):
return {"inputs": {"type": "string"}}


class MockOutputs:
def to_json(self):
return {"outputs": {"type": "string"}}


class MockOperator(Operator):
def __init__(self, success=True, sets_progress=False, **kwargs):
self.success = success
Expand Down Expand Up @@ -73,6 +83,14 @@ def execute(self, ctx):
return ExecutionResult(result={"executed": True})


class MockOperatorWithIO(MockOperator):
def resolve_input(self, *args, **kwargs):
return MockInputs()

def resolve_output(self, *args, **kwargs):
return MockOutputs()


class MockGeneratorOperator(Operator):
def __init__(self, success=True, sets_progress=False, **kwargs):
self.success = success
Expand Down Expand Up @@ -171,18 +189,23 @@ def test_delegate_operation(
self.assertIsNotNone(doc.queued_at)
self.assertEqual(doc.label, "Mock Operator")
self.assertEqual(doc.run_state, ExecutionRunState.QUEUED)
self.assertIsNone(doc.metadata)

doc2_metadata = {"inputs_schema": {}}
doc2 = self.svc.queue_operation(
operator="@voxelfiftyone/operator/foo",
delegation_target="foo",
context=ExecutionContext(
request_params={"foo": "bar", "dataset_name": dataset_name},
),
metadata=doc2_metadata,
)
self.docs_to_delete.append(doc2)
self.assertIsNotNone(doc2.queued_at)
self.assertEqual(doc2.label, "@voxelfiftyone/operator/foo")
self.assertEqual(doc2.run_state, ExecutionRunState.QUEUED)
self.assertIsNotNone(doc2.metadata)
self.assertEqual(doc2.metadata, doc2_metadata)

def test_list_queued_operations(
self, mock_get_operator, mock_operator_exists
Expand Down Expand Up @@ -276,14 +299,21 @@ def test_list_queued_operations(
def test_set_run_states(
self, mock_load_dataset, mock_get_operator, mock_operator_exists
):
mock_inputs = MockInputs()
mock_outputs = MockOutputs()
mock_load_dataset.return_value = MockDataset()
mock_get_operator.return_value = MockOperatorWithIO()
doc = self.svc.queue_operation(
operator="@voxelfiftyone/operator/foo",
label=mock_get_operator.return_value.name,
delegation_target=f"test_target",
context=ExecutionContext(
request_params={"foo": "bar", "dataset_id": str(ObjectId())}
),
metadata={"inputs_schema": mock_inputs.to_json()},
)
self.assertEqual(
doc.metadata, {"inputs_schema": mock_inputs.to_json()}
)

original_updated_at = doc.updated_at
Expand All @@ -301,6 +331,13 @@ def test_set_run_states(
doc = self.svc.set_completed(doc_id=doc.id)
self.assertEqual(doc.run_state, ExecutionRunState.COMPLETED)
self.assertNotEqual(doc.updated_at, original_updated_at)
self.assertEqual(
doc.metadata,
{
"inputs_schema": mock_inputs.to_json(),
"outputs_schema": mock_outputs.to_json(),
},
)
original_updated_at = doc.updated_at
time.sleep(0.1)

Expand Down

0 comments on commit d7421c0

Please sign in to comment.