Skip to content

Commit

Permalink
Merge pull request #5181 from voxel51/fix/outputs-schema
Browse files Browse the repository at this point in the history
Don't require pipeline updates when setting output schema
  • Loading branch information
brimoor authored Nov 23, 2024
2 parents 03c4e84 + 4e0a5bb commit 99ee8af
Show file tree
Hide file tree
Showing 2 changed files with 3 additions and 42 deletions.
13 changes: 2 additions & 11 deletions fiftyone/factory/repos/delegated_operation.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,15 +177,15 @@ def queue_operation(self, **kwargs: Any) -> DelegatedOperationDocument:
raise ValueError("Missing required property '%s'" % prop)
setattr(op, prop, kwargs.get(prop))

# also set the delegation target (not required)
delegation_target = kwargs.get("delegation_target", None)
if delegation_target:
setattr(op, "delegation_target", delegation_target)

# also set the metadata (not required)
metadata = kwargs.get("metadata", None)
if metadata:
setattr(op, "metadata", metadata)
else:
setattr(op, "metadata", {})

context = None
if isinstance(op.context, dict):
Expand Down Expand Up @@ -262,8 +262,6 @@ def update_run_state(
else None
)

needs_pipeline_update = False

if run_state == ExecutionRunState.COMPLETED:
update = {
"$set": {
Expand All @@ -278,7 +276,6 @@ def update_run_state(
update["$set"]["metadata.outputs_schema"] = (
outputs_schema or {}
)
needs_pipeline_update = True

elif run_state == ExecutionRunState.FAILED:
update = {
Expand Down Expand Up @@ -328,12 +325,6 @@ def update_run_state(
if required_state is not None:
collection_filter["run_state"] = required_state

# Using pipeline update instead of a single update doc fixes a case
# where `metadata` is null and so accessing the dotted field
# `metadata.output_schema` creates the document instead of erroring.
if needs_pipeline_update:
update = [update]

doc = self._collection.find_one_and_update(
filter=collection_filter,
update=update,
Expand Down
32 changes: 1 addition & 31 deletions tests/unittests/operators/delegated_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ def test_delegate_operation(
self.assertIsNotNone(doc.queued_at)
self.assertEqual(doc.label, "Mock Operator")
self.assertEqual(doc.run_state, ExecutionRunState.QUEUED)
self.assertIsNone(doc.metadata)
self.assertEqual(doc.metadata, {})

doc2_metadata = {"inputs_schema": {}}
doc2 = self.svc.queue_operation(
Expand All @@ -227,7 +227,6 @@ def test_delegate_operation(
self.assertIsNotNone(doc2.queued_at)
self.assertEqual(doc2.label, "@voxelfiftyone/operator/foo")
self.assertEqual(doc2.run_state, ExecutionRunState.QUEUED)
self.assertIsNotNone(doc2.metadata)
self.assertEqual(doc2.metadata, doc2_metadata)

def test_list_operations(self, mock_get_operator, mock_operator_exists):
Expand Down Expand Up @@ -485,35 +484,6 @@ def test_sets_progress(
self.assertEqual(doc.status.label, "halfway there")
self.assertIsNotNone(doc.status.updated_at)

def test_output_schema_null_metadata(
self, mock_get_operator, mock_operator_exists
):
mock_outputs = MockOutputs()
doc = self.svc.queue_operation(
operator="@voxelfiftyone/operator/foo",
delegation_target="test_target",
context=ExecutionContext(request_params={"foo": "bar"}),
)

# Set metadata to null instead of being unset, to test that corner case
self.svc._repo._collection.find_one_and_update(
{"_id": bson.ObjectId(doc.id)}, {"$set": {"metadata": None}}
)

self.svc.set_completed(
doc.id,
result=ExecutionResult(outputs_schema=mock_outputs.to_json()),
)

doc = self.svc.get(doc_id=doc.id)
self.assertEqual(doc.run_state, ExecutionRunState.COMPLETED)
self.assertEqual(
doc.metadata,
{
"outputs_schema": mock_outputs.to_json(),
},
)

@patch(
"fiftyone.core.odm.utils.load_dataset",
)
Expand Down

0 comments on commit 99ee8af

Please sign in to comment.