Skip to content

Commit

Permalink
Scheduler/default scheduled (#4871)
Browse files Browse the repository at this point in the history
* Add internal util for checking if remote or local

* Changing default DO status based on internal or remote

* updating scheduled_at or queued_at based on if it's internal

* change util to use method that combines all 3 service checks into one

* remove is internal service method since it's not used in OSS

* update unit tests

* update tests

* reverse scheduled and queued

* update comment

* fix merge issues

* fix merge

* [skip ci] add test for is remote

* fix test

---------

Co-authored-by: Stuart Wheaton <stuart@voxel51.com>
  • Loading branch information
CamronStaley and swheaton authored Oct 17, 2024
1 parent 8feb0c9 commit 7a7dfdd
Show file tree
Hide file tree
Showing 6 changed files with 68 additions and 9 deletions.
3 changes: 0 additions & 3 deletions fiftyone/core/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
"""

import argparse
import warnings
from collections import defaultdict
from datetime import datetime
import json
Expand All @@ -19,8 +18,6 @@

import argcomplete
from bson import ObjectId
import humanize
import pytz
from tabulate import tabulate
import webbrowser

Expand Down
5 changes: 3 additions & 2 deletions fiftyone/factory/repos/delegated_operation.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from pymongo import IndexModel
from pymongo.collection import Collection

from fiftyone.internal.util import is_remote_service
from fiftyone.factory import DelegatedOperationPagingParams
from fiftyone.factory.repos import DelegatedOperationDocument
from fiftyone.operators.executor import (
Expand Down Expand Up @@ -134,7 +135,7 @@ def __init__(self, collection: Collection = None):
self._collection = (
collection if collection is not None else self._get_collection()
)

self.is_remote = is_remote_service()
self._create_indexes()

def _get_collection(self) -> Collection:
Expand Down Expand Up @@ -170,7 +171,7 @@ def _create_indexes(self):
self._collection.create_indexes(indices_to_create)

def queue_operation(self, **kwargs: Any) -> DelegatedOperationDocument:
op = DelegatedOperationDocument()
op = DelegatedOperationDocument(is_remote=self.is_remote)
for prop in self.required_props:
if prop not in kwargs:
raise ValueError("Missing required property '%s'" % prop)
Expand Down
11 changes: 7 additions & 4 deletions fiftyone/factory/repos/delegated_operation_doc.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ def __init__(
operator: str = None,
delegation_target: str = None,
context: dict = None,
is_remote: bool = False,
):
self.operator = operator
self.label = None
Expand All @@ -35,18 +36,20 @@ def __init__(
else context
)
self.run_state = (
ExecutionRunState.QUEUED
) # default to queued state on create
ExecutionRunState.SCHEDULED
if is_remote
else ExecutionRunState.QUEUED
) # if running locally use QUEUED otherwise SCHEDULED
self.run_link = None
self.queued_at = datetime.utcnow()
self.queued_at = datetime.utcnow() if not is_remote else None
self.updated_at = datetime.utcnow()
self.status = None
self.dataset_id = None
self.started_at = None
self.pinned = False
self.completed_at = None
self.failed_at = None
self.scheduled_at = None
self.scheduled_at = datetime.utcnow() if is_remote else None
self.result = None
self.id = None
self._doc = None
Expand Down
1 change: 1 addition & 0 deletions fiftyone/internal/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@
"""

from .secrets import *
from .util import is_remote_service
34 changes: 34 additions & 0 deletions fiftyone/internal/util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
"""
FiftyOne internal utilities.
| Copyright 2017-2024, Voxel51, Inc.
| `voxel51.com <https://voxel51.com/>`_
|
"""


def is_remote_service():
"""Whether the SDK is running in a remote service context.
Returns:
True/False
"""
return has_encryption_key() and has_api_key()


def has_encryption_key():
"""Whether the current environment has an encryption key.
Returns:
True/False
"""
return False


def has_api_key():
"""Whether the current environment has an API key.
Returns:
True/False
"""
return False
23 changes: 23 additions & 0 deletions tests/unittests/operators/delegated_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
ExecutionResult,
ExecutionRunState,
)
from fiftyone.factory.repos import delegated_operation
from fiftyone.operators.operator import Operator, OperatorConfig


Expand Down Expand Up @@ -1333,3 +1334,25 @@ async def test_set_completed_in_async_context(

doc = self.svc.set_completed(doc_id=doc.id)
self.assertEqual(doc.run_state, ExecutionRunState.COMPLETED)

@patch.object(
delegated_operation,
"is_remote_service",
return_value=True,
)
def test_queue_op_remote_service(
self, mock_is_remote_service, mock_get_operator, mock_operator_exists
):
db = delegated_operation.MongoDelegatedOperationRepo()
dos = DelegatedOperationService(repo=db)
ctx = ExecutionContext()
ctx.request_params = {"foo": "bar"}
doc = dos.queue_operation(
operator="@voxelfiftyone/operator/foo",
label=mock_get_operator.return_value.name,
delegation_target="test_target",
context=ctx.serialize(),
)
self.docs_to_delete.append(doc)
self.assertTrue(db.is_remote)
self.assertEqual(doc.run_state, ExecutionRunState.SCHEDULED)

0 comments on commit 7a7dfdd

Please sign in to comment.