diff --git a/tfx/orchestration/experimental/core/env.py b/tfx/orchestration/experimental/core/env.py index bf04dff145..5c804b52e2 100644 --- a/tfx/orchestration/experimental/core/env.py +++ b/tfx/orchestration/experimental/core/env.py @@ -162,6 +162,10 @@ def get_status_code_from_exception( def maximum_active_task_schedulers(self) -> int: """Returns the maximum number of active task schedulers.""" + @abc.abstractmethod + def get_pipeline_service_address(self) -> Optional[str]: + """Returns the pipeline service address.""" + class _DefaultEnv(Env): """Default environment.""" @@ -251,6 +255,9 @@ def get_status_code_from_exception( def maximum_active_task_schedulers(self) -> int: return 1 + def get_pipeline_service_address(self) -> Optional[str]: + return None + _ENV = _DefaultEnv() diff --git a/tfx/orchestration/experimental/core/env_test.py b/tfx/orchestration/experimental/core/env_test.py index 04f3506482..411b2b7769 100644 --- a/tfx/orchestration/experimental/core/env_test.py +++ b/tfx/orchestration/experimental/core/env_test.py @@ -103,6 +103,9 @@ def should_orchestrate(self, pipeline: pipeline_pb2.Pipeline) -> bool: def maximum_active_task_schedulers(self) -> int: raise NotImplementedError() + def get_pipeline_service_address(self) -> Optional[str]: + raise NotImplementedError() + class EnvTest(test_utils.TfxTest):