diff --git a/fiftyone/factory/repos/delegated_operation.py b/fiftyone/factory/repos/delegated_operation.py index 79cfcda3c7c..6e4e7f4c849 100644 --- a/fiftyone/factory/repos/delegated_operation.py +++ b/fiftyone/factory/repos/delegated_operation.py @@ -44,6 +44,7 @@ def update_run_state( run_state: ExecutionRunState, result: ExecutionResult = None, run_link: str = None, + log_path: str = None, progress: ExecutionProgress = None, required_state: ExecutionRunState = None, ) -> DelegatedOperationDocument: @@ -121,7 +122,9 @@ def set_log_upload_error( self, _id: ObjectId, log_upload_error: str ) -> DelegatedOperationDocument: """Sets the log upload error for the delegated operation.""" - raise NotImplementedError("subclass must implement set_log_upload_error()") + raise NotImplementedError( + "subclass must implement set_log_upload_error()" + ) def get(self, _id: ObjectId) -> DelegatedOperationDocument: """Get an operation by id.""" @@ -268,6 +271,7 @@ def update_run_state( run_state: ExecutionRunState, result: ExecutionResult = None, run_link: str = None, + log_path: str = None, progress: ExecutionProgress = None, required_state: ExecutionRunState = None, ) -> DelegatedOperationDocument: @@ -338,6 +342,9 @@ def update_run_state( if run_link is not None: update["$set"]["run_link"] = run_link + if log_path is not None: + update["$set"]["log_path"] = log_path + if update is None: raise ValueError("Invalid run_state: {}".format(run_state)) diff --git a/fiftyone/factory/repos/delegated_operation_doc.py b/fiftyone/factory/repos/delegated_operation_doc.py index 55e63a28bfa..9fb69c9576d 100644 --- a/fiftyone/factory/repos/delegated_operation_doc.py +++ b/fiftyone/factory/repos/delegated_operation_doc.py @@ -57,6 +57,7 @@ def __init__( self._doc = None self.metadata = None self.log_upload_error = None + self.log_path = None def from_pymongo(self, doc: dict): # required fields @@ -74,6 +75,7 @@ def from_pymongo(self, doc: dict): self.dataset_id = doc.get("dataset_id", None) self.run_link = doc.get("run_link", None) self.log_upload_error = doc.get("log_upload_error", None) + self.log_path = doc.get("log_path", None) self.metadata = doc.get("metadata", None) self.label = doc.get("label", None) self.updated_at = doc.get("updated_at", None) diff --git a/fiftyone/operators/delegated.py b/fiftyone/operators/delegated.py index 88344a264a6..ff6cb34844b 100644 --- a/fiftyone/operators/delegated.py +++ b/fiftyone/operators/delegated.py @@ -84,6 +84,7 @@ def set_running( doc_id, progress=None, run_link=None, + log_path=None, required_state=None, ): """Sets the given delegated operation to running state. @@ -95,6 +96,7 @@ def set_running( operation run_link (None): an optional link to orchestrator-specific information about the operation + log_path (None): an optional path to the log file for the operation required_state (None): an optional :class:`fiftyone.operators.executor.ExecutionRunState` required state of the operation. If provided, the update will only be @@ -108,6 +110,7 @@ def set_running( _id=doc_id, run_state=ExecutionRunState.RUNNING, run_link=run_link, + log_path=log_path, progress=progress, required_state=required_state, ) @@ -157,6 +160,7 @@ def set_completed( result=None, progress=None, run_link=None, + log_path=None, required_state=None, ): """Sets the given delegated operation to completed state. @@ -171,6 +175,7 @@ def set_completed( operation run_link (None): an optional link to orchestrator-specific information about the operation + log_path (None): an optional path to the log file for the operation required_state (None): an optional :class:`fiftyone.operators.executor.ExecutionRunState` required state of the operation. If provided, the update will only be @@ -187,6 +192,7 @@ def set_completed( result=result, progress=progress, run_link=run_link, + log_path=log_path, required_state=required_state, ) @@ -196,6 +202,7 @@ def set_failed( result=None, progress=None, run_link=None, + log_path=None, required_state=None, ): """Sets the given delegated operation to failed state. @@ -210,6 +217,7 @@ def set_failed( operation run_link (None): an optional link to orchestrator-specific information about the operation + log_path (None): an optional path to the log file for the operation required_state (None): an optional :class:`fiftyone.operators.executor.ExecutionRunState` required state of the operation. If provided, the update will only be @@ -224,6 +232,7 @@ def set_failed( run_state=ExecutionRunState.FAILED, result=result, run_link=run_link, + log_path=log_path, progress=progress, required_state=required_state, ) @@ -263,7 +272,9 @@ def set_log_upload_error(self, doc_id, log_upload_error): Returns: a :class:`fiftyone.factory.repos.DelegatedOperationDocument` """ - return self._repo.set_log_upload_error(_id=doc_id, log_upload_error=log_upload_error) + return self._repo.set_log_upload_error( + _id=doc_id, log_upload_error=log_upload_error + ) def delete_operation(self, doc_id): """Deletes the given delegated operation. @@ -447,7 +458,9 @@ def count(self, filters=None, search=None): """ return self._repo.count(filters=filters, search=search) - def execute_operation(self, operation, log=False, run_link=None): + def execute_operation( + self, operation, log=False, run_link=None, log_path=None + ): """Executes the given delegated operation. Args: @@ -457,6 +470,7 @@ def execute_operation(self, operation, log=False, run_link=None): delegated operations run_link (None): an optional link to orchestrator-specific information about the operation + log_path (None): an optional path to the log file for the operation """ result = None try: @@ -464,6 +478,7 @@ def execute_operation(self, operation, log=False, run_link=None): self.set_running( doc_id=operation.id, run_link=run_link, + log_path=log_path, required_state=ExecutionRunState.QUEUED, ) is not None