Skip to content

Commit

Permalink
Merge pull request #5180 from voxel51/fix/do-from-sdk
Browse files Browse the repository at this point in the history
Fix execute_operator() with delegation
  • Loading branch information
brimoor authored Nov 23, 2024
2 parents 7b079dc + be38d26 commit 03c4e84
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 20 deletions.
10 changes: 5 additions & 5 deletions fiftyone/operators/delegated.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ def queue_operation(
- inputs_schema: the schema of the operator's inputs
- outputs_schema: the schema of the operator's outputs
Returns:
a :class:`fiftyone.factory.repos.DelegatedOperationDocument`
"""
Expand Down Expand Up @@ -514,11 +513,12 @@ async def _execute_operator(self, doc):
result = await do_execute_operator(operator, ctx, exhaust=True)

outputs_schema = None
request_params = {**context.request_params, "results": result}
try:
outputs = await resolve_type_with_context(
request_params, "outputs"
)
# Resolve output types now
ctx.request_params["target"] = "outputs"
ctx.request_params["results"] = result

outputs = await resolve_type_with_context(operator, ctx)
if outputs is not None:
outputs_schema = outputs.to_json()
except (AttributeError, Exception):
Expand Down
27 changes: 12 additions & 15 deletions fiftyone/operators/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -379,31 +379,28 @@ async def resolve_type(registry, operator_uri, request_params):
required_secrets=operator._plugin_secrets,
)
await ctx.resolve_secret_values(operator._plugin_secrets)
try:
return operator.resolve_type(
ctx, request_params.get("target", "inputs")
)
except Exception as e:
return ExecutionResult(error=traceback.format_exc())

return await resolve_type_with_context(operator, ctx)


async def resolve_type_with_context(request_params, target=None):
async def resolve_type_with_context(operator, context):
"""Resolves the "inputs" or "outputs" schema of an operator with the given
context.
Args:
request_params: a dictionary of request parameters
target (None): the target schema ("inputs" or "outputs")
operator: the :class:`fiftyone.operators.Operator`
context: the :class:`ExecutionContext` of an operator
Returns:
the schema of "inputs" or "outputs"
the "inputs" or "outputs" schema
:class:`fiftyone.operators.types.Property` of an operator, or None
"""
computed_target = target or request_params.get("target", None)
computed_request_params = {**request_params, "target": computed_target}
operator_uri = request_params.get("operator_uri", None)
registry = OperatorRegistry()
return await resolve_type(registry, operator_uri, computed_request_params)
try:
return operator.resolve_type(
context, context.request_params.get("target", "inputs")
)
except Exception as e:
return ExecutionResult(error=traceback.format_exc())


async def resolve_execution_options(registry, operator_uri, request_params):
Expand Down

0 comments on commit 03c4e84

Please sign in to comment.