Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add log path field to DO #5467

Merged
merged 1 commit into from
Feb 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion fiftyone/factory/repos/delegated_operation.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ def update_run_state(
run_state: ExecutionRunState,
result: ExecutionResult = None,
run_link: str = None,
log_path: str = None,
progress: ExecutionProgress = None,
required_state: ExecutionRunState = None,
) -> DelegatedOperationDocument:
Expand Down Expand Up @@ -121,7 +122,9 @@ def set_log_upload_error(
self, _id: ObjectId, log_upload_error: str
) -> DelegatedOperationDocument:
"""Sets the log upload error for the delegated operation."""
raise NotImplementedError("subclass must implement set_log_upload_error()")
raise NotImplementedError(
"subclass must implement set_log_upload_error()"
)

def get(self, _id: ObjectId) -> DelegatedOperationDocument:
"""Get an operation by id."""
Expand Down Expand Up @@ -268,6 +271,7 @@ def update_run_state(
run_state: ExecutionRunState,
result: ExecutionResult = None,
run_link: str = None,
log_path: str = None,
progress: ExecutionProgress = None,
required_state: ExecutionRunState = None,
) -> DelegatedOperationDocument:
Expand Down Expand Up @@ -338,6 +342,9 @@ def update_run_state(
if run_link is not None:
update["$set"]["run_link"] = run_link

if log_path is not None:
update["$set"]["log_path"] = log_path

if update is None:
raise ValueError("Invalid run_state: {}".format(run_state))

Expand Down
2 changes: 2 additions & 0 deletions fiftyone/factory/repos/delegated_operation_doc.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ def __init__(
self._doc = None
self.metadata = None
self.log_upload_error = None
self.log_path = None

def from_pymongo(self, doc: dict):
# required fields
Expand All @@ -74,6 +75,7 @@ def from_pymongo(self, doc: dict):
self.dataset_id = doc.get("dataset_id", None)
self.run_link = doc.get("run_link", None)
self.log_upload_error = doc.get("log_upload_error", None)
self.log_path = doc.get("log_path", None)
self.metadata = doc.get("metadata", None)
self.label = doc.get("label", None)
self.updated_at = doc.get("updated_at", None)
Expand Down
19 changes: 17 additions & 2 deletions fiftyone/operators/delegated.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ def set_running(
doc_id,
progress=None,
run_link=None,
log_path=None,
required_state=None,
):
"""Sets the given delegated operation to running state.
Expand All @@ -95,6 +96,7 @@ def set_running(
operation
run_link (None): an optional link to orchestrator-specific
information about the operation
log_path (None): an optional path to the log file for the operation
required_state (None): an optional
:class:`fiftyone.operators.executor.ExecutionRunState` required
state of the operation. If provided, the update will only be
Expand All @@ -108,6 +110,7 @@ def set_running(
_id=doc_id,
run_state=ExecutionRunState.RUNNING,
run_link=run_link,
log_path=log_path,
progress=progress,
required_state=required_state,
)
Expand Down Expand Up @@ -157,6 +160,7 @@ def set_completed(
result=None,
progress=None,
run_link=None,
log_path=None,
required_state=None,
):
"""Sets the given delegated operation to completed state.
Expand All @@ -171,6 +175,7 @@ def set_completed(
operation
run_link (None): an optional link to orchestrator-specific
information about the operation
log_path (None): an optional path to the log file for the operation
required_state (None): an optional
:class:`fiftyone.operators.executor.ExecutionRunState` required
state of the operation. If provided, the update will only be
Expand All @@ -187,6 +192,7 @@ def set_completed(
result=result,
progress=progress,
run_link=run_link,
log_path=log_path,
required_state=required_state,
)

Expand All @@ -196,6 +202,7 @@ def set_failed(
result=None,
progress=None,
run_link=None,
log_path=None,
required_state=None,
):
"""Sets the given delegated operation to failed state.
Expand All @@ -210,6 +217,7 @@ def set_failed(
operation
run_link (None): an optional link to orchestrator-specific
information about the operation
log_path (None): an optional path to the log file for the operation
required_state (None): an optional
:class:`fiftyone.operators.executor.ExecutionRunState` required
state of the operation. If provided, the update will only be
Expand All @@ -224,6 +232,7 @@ def set_failed(
run_state=ExecutionRunState.FAILED,
result=result,
run_link=run_link,
log_path=log_path,
progress=progress,
required_state=required_state,
)
Expand Down Expand Up @@ -263,7 +272,9 @@ def set_log_upload_error(self, doc_id, log_upload_error):
Returns:
a :class:`fiftyone.factory.repos.DelegatedOperationDocument`
"""
return self._repo.set_log_upload_error(_id=doc_id, log_upload_error=log_upload_error)
return self._repo.set_log_upload_error(
_id=doc_id, log_upload_error=log_upload_error
)

def delete_operation(self, doc_id):
"""Deletes the given delegated operation.
Expand Down Expand Up @@ -447,7 +458,9 @@ def count(self, filters=None, search=None):
"""
return self._repo.count(filters=filters, search=search)

def execute_operation(self, operation, log=False, run_link=None):
def execute_operation(
self, operation, log=False, run_link=None, log_path=None
):
"""Executes the given delegated operation.

Args:
Expand All @@ -457,13 +470,15 @@ def execute_operation(self, operation, log=False, run_link=None):
delegated operations
run_link (None): an optional link to orchestrator-specific
information about the operation
log_path (None): an optional path to the log file for the operation
"""
result = None
try:
succeeded = (
self.set_running(
doc_id=operation.id,
run_link=run_link,
log_path=log_path,
required_state=ExecutionRunState.QUEUED,
)
is not None
Expand Down
Loading