From e78befb105cd87c04212e34a7c86a8f424077bad Mon Sep 17 00:00:00 2001 From: SangBin Cho Date: Mon, 17 Jul 2023 14:22:39 +0900 Subject: [PATCH] [Logging] Switch worker_setup_hook to worker_process_setup_hook (#37247) Change worker_setup_hook -> worker_process_setup_hook Signed-off-by: SangBin Cho --- .../user-guides/configure-logging.md | 6 +++- python/ray/_private/function_manager.py | 11 ++++-- python/ray/_private/ray_constants.py | 8 +++-- python/ray/_private/runtime_env/setup_hook.py | 34 +++++++++++-------- python/ray/_private/worker.py | 6 ++-- python/ray/_private/workers/default_worker.py | 8 +++-- python/ray/includes/common.pxi | 2 +- python/ray/runtime_env/runtime_env.py | 16 ++++----- .../ray/tests/test_runtime_env_setup_func.py | 14 ++++---- 9 files changed, 64 insertions(+), 41 deletions(-) diff --git a/doc/source/ray-observability/user-guides/configure-logging.md b/doc/source/ray-observability/user-guides/configure-logging.md index ad7502e1b569b..15676ef437798 100644 --- a/doc/source/ray-observability/user-guides/configure-logging.md +++ b/doc/source/ray-observability/user-guides/configure-logging.md @@ -384,10 +384,14 @@ ray.get(f.remote("A log message for a task.")) ```{admonition} Caution :class: caution -This is an experimental feature. It doesn't support [Ray Client](ray-client-ref) yet. +This is an experimental feature. The semantic of the API is subject to change. +It doesn't support [Ray Client](ray-client-ref) yet. + +Currently, all the runtime environment passed to a driver (`ray.init(runtime_env={...})`) will be ignored if you specify any runtime environment via [Ray Job Submission](jobs-quickstart) API (`ray job submit --working-dir` or `ray job submit --runtime-env`). ``` Use `worker_process_setup_hook` to apply the new logging configuration to all worker processes within a job. + ```python # driver.py def logging_setup_func(): diff --git a/python/ray/_private/function_manager.py b/python/ray/_private/function_manager.py index 06b2a08ee94f0..bee0ae6fde3f9 100644 --- a/python/ray/_private/function_manager.py +++ b/python/ray/_private/function_manager.py @@ -27,7 +27,11 @@ format_error_message, ) from ray._private.serialization import pickle_dumps -from ray._raylet import JobID, PythonFunctionDescriptor, WORKER_SETUP_HOOK_KEY_NAME_GCS +from ray._raylet import ( + JobID, + PythonFunctionDescriptor, + WORKER_PROCESS_SETUP_HOOK_KEY_NAME_GCS, +) FunctionExecutionInfo = namedtuple( "FunctionExecutionInfo", ["function", "function_name", "max_calls"] @@ -178,7 +182,8 @@ def export_setup_func( ) -> bytes: """Export the setup hook function and return the key.""" pickled_function = pickle_dumps( - setup_func, f"Cannot serialize the worker_setup_hook {setup_func.__name__}" + setup_func, + "Cannot serialize the worker_process_setup_hook " f"{setup_func.__name__}", ) function_to_run_id = hashlib.shake_128(pickled_function).digest( @@ -187,7 +192,7 @@ def export_setup_func( key = make_function_table_key( # This value should match with gcs_function_manager.h. # Otherwise, it won't be GC'ed. - WORKER_SETUP_HOOK_KEY_NAME_GCS.encode(), + WORKER_PROCESS_SETUP_HOOK_KEY_NAME_GCS.encode(), # b"FunctionsToRun", self._worker.current_job_id.binary(), function_to_run_id, diff --git a/python/ray/_private/ray_constants.py b/python/ray/_private/ray_constants.py index e25bc0d4038e5..8d834f2b53774 100644 --- a/python/ray/_private/ray_constants.py +++ b/python/ray/_private/ray_constants.py @@ -436,5 +436,9 @@ def gcs_actor_scheduling_enabled(): "RAY_ENABLE_RECORD_ACTOR_TASK_LOGGING", False ) -WORKER_SETUP_HOOK_ENV_VAR = "__RAY_WORKER_SETUP_HOOK_ENV_VAR" -RAY_WORKER_SETUP_HOOK_LOAD_TIMEOUT_ENV_VAR = "RAY_WORKER_SETUP_HOOK_LOAD_TIMEOUT" +WORKER_PROCESS_SETUP_HOOK_ENV_VAR = "__RAY_WORKER_PROCESS_SETUP_HOOK_ENV_VAR" +RAY_WORKER_PROCESS_SETUP_HOOK_LOAD_TIMEOUT_ENV_VAR = ( + "RAY_WORKER_PROCESS_SETUP_HOOK_LOAD_TIMEOUT" # noqa +) + +RAY_DEFAULT_LABEL_KEYS_PREFIX = "ray.io/" diff --git a/python/ray/_private/runtime_env/setup_hook.py b/python/ray/_private/runtime_env/setup_hook.py index 135252dd46114..afc718dc61cac 100644 --- a/python/ray/_private/runtime_env/setup_hook.py +++ b/python/ray/_private/runtime_env/setup_hook.py @@ -15,7 +15,9 @@ def get_import_export_timeout(): return int( - os.environ.get(ray_constants.RAY_WORKER_SETUP_HOOK_LOAD_TIMEOUT_ENV_VAR, "60") + os.environ.get( + ray_constants.RAY_WORKER_PROCESS_SETUP_HOOK_LOAD_TIMEOUT_ENV_VAR, "60" + ) ) @@ -27,15 +29,15 @@ def _encode_function_key(key: str) -> bytes: return base64.b64decode(key) -def upload_worker_setup_hook_if_needed( +def upload_worker_process_setup_hook_if_needed( runtime_env: Union[Dict[str, Any], RuntimeEnv], worker: "ray.Worker", ) -> Union[Dict[str, Any], RuntimeEnv]: - """Uploads the worker_setup_hook to GCS with a key. + """Uploads the worker_process_setup_hook to GCS with a key. - runtime_env["worker_setup_hook"] is converted to a decoded key + runtime_env["worker_process_setup_hook"] is converted to a decoded key that can load the worker setup hook function from GCS. - I.e., you can use internalKV.Get(runtime_env["worker_setup_hook]) + i.e., you can use internalKV.Get(runtime_env["worker_process_setup_hook]) to access the worker setup hook from GCS. Args: @@ -48,13 +50,13 @@ def upload_worker_setup_hook_if_needed( a string. The given decoder is used to decode the function key. """ - setup_func = runtime_env.get("worker_setup_hook") + setup_func = runtime_env.get("worker_process_setup_hook") if setup_func is None: return runtime_env if not isinstance(setup_func, Callable): raise TypeError( - "worker_setup_hook must be a function, " f"got {type(setup_func)}." + "worker_process_setup_hook must be a function, " f"got {type(setup_func)}." ) # TODO(sang): Support modules. @@ -67,39 +69,41 @@ def upload_worker_setup_hook_if_needed( "Failed to export the setup function." ) from e env_vars = runtime_env.get("env_vars", {}) - assert ray_constants.WORKER_SETUP_HOOK_ENV_VAR not in env_vars, ( - f"The env var, {ray_constants.WORKER_SETUP_HOOK_ENV_VAR}, " + assert ray_constants.WORKER_PROCESS_SETUP_HOOK_ENV_VAR not in env_vars, ( + f"The env var, {ray_constants.WORKER_PROCESS_SETUP_HOOK_ENV_VAR}, " "is not permitted because it is reserved for the internal use." ) - env_vars[ray_constants.WORKER_SETUP_HOOK_ENV_VAR] = _decode_function_key(key) + env_vars[ray_constants.WORKER_PROCESS_SETUP_HOOK_ENV_VAR] = _decode_function_key( + key + ) runtime_env["env_vars"] = env_vars # Note: This field is no-op. We don't have a plugin for the setup hook # because we can implement it simply using an env var. # This field is just for the observability purpose, so we store # the name of the method. - runtime_env["worker_setup_hook"] = setup_func.__name__ + runtime_env["worker_process_setup_hook"] = setup_func.__name__ return runtime_env def load_and_execute_setup_hook( - worker_setup_hook_key: str, + worker_process_setup_hook_key: str, ) -> Optional[str]: """Load the setup hook from a given key and execute. Args: - worker_setup_hook_key: The key to import the setup hook + worker_process_setup_hook_key: The key to import the setup hook from GCS. Returns: An error message if it fails. None if it succeeds. """ - assert worker_setup_hook_key is not None + assert worker_process_setup_hook_key is not None worker = ray._private.worker.global_worker assert worker.connected func_manager = worker.function_actor_manager try: worker_setup_func_info = func_manager.fetch_registered_method( - _encode_function_key(worker_setup_hook_key), + _encode_function_key(worker_process_setup_hook_key), timeout=get_import_export_timeout(), ) except Exception: diff --git a/python/ray/_private/worker.py b/python/ray/_private/worker.py index e841e9532628a..7900e1cb710c5 100644 --- a/python/ray/_private/worker.py +++ b/python/ray/_private/worker.py @@ -73,7 +73,9 @@ from ray._private.runtime_env.constants import RAY_JOB_CONFIG_JSON_ENV_VAR from ray._private.runtime_env.py_modules import upload_py_modules_if_needed from ray._private.runtime_env.working_dir import upload_working_dir_if_needed -from ray._private.runtime_env.setup_hook import upload_worker_setup_hook_if_needed +from ray._private.runtime_env.setup_hook import ( + upload_worker_process_setup_hook_if_needed, +) from ray._private.storage import _load_class from ray._private.utils import get_ray_doc_version from ray.exceptions import ObjectStoreFullError, RayError, RaySystemError, RayTaskError @@ -2170,7 +2172,7 @@ def connect( runtime_env = upload_working_dir_if_needed( runtime_env, scratch_dir, logger=logger ) - runtime_env = upload_worker_setup_hook_if_needed( + runtime_env = upload_worker_process_setup_hook_if_needed( runtime_env, worker, ) diff --git a/python/ray/_private/workers/default_worker.py b/python/ray/_private/workers/default_worker.py index f51db00687a87..ccda3ccec5cc8 100644 --- a/python/ray/_private/workers/default_worker.py +++ b/python/ray/_private/workers/default_worker.py @@ -252,9 +252,11 @@ ray._private.utils.try_import_each_module(module_names_to_import) # If the worker setup function is configured, run it. - worker_setup_hook_key = os.getenv(ray_constants.WORKER_SETUP_HOOK_ENV_VAR) - if worker_setup_hook_key: - error = load_and_execute_setup_hook(worker_setup_hook_key) + worker_process_setup_hook_key = os.getenv( + ray_constants.WORKER_PROCESS_SETUP_HOOK_ENV_VAR + ) + if worker_process_setup_hook_key: + error = load_and_execute_setup_hook(worker_process_setup_hook_key) if error is not None: worker.core_worker.exit_worker("system", error) diff --git a/python/ray/includes/common.pxi b/python/ray/includes/common.pxi index fc9b007d827f8..0e14cd275d1ab 100644 --- a/python/ray/includes/common.pxi +++ b/python/ray/includes/common.pxi @@ -30,6 +30,6 @@ cdef class GcsClientOptions: return (self.inner.get()) -WORKER_SETUP_HOOK_KEY_NAME_GCS = str(kWorkerSetupHookKeyName) +WORKER_PROCESS_SETUP_HOOK_KEY_NAME_GCS = str(kWorkerSetupHookKeyName) RESOURCE_UNIT_SCALING = kResourceUnitScaling STREAMING_GENERATOR_RETURN = kStreamingGeneratorReturn diff --git a/python/ray/runtime_env/runtime_env.py b/python/ray/runtime_env/runtime_env.py index e990725856883..bbeb0e0544f0c 100644 --- a/python/ray/runtime_env/runtime_env.py +++ b/python/ray/runtime_env/runtime_env.py @@ -235,11 +235,11 @@ class MyClass: The `run_options` list spec is here: https://docs.docker.com/engine/reference/run/ env_vars: Environment variables to set. - worker_setup_hook: The setup hook that's called after workers - start and before tasks and actors are scheduled. - The value has to be a callable when passed to the job/task/actor. + worker_process_setup_hook: (Experimental) The setup hook that's + called after workers start and before Tasks and Actors are scheduled. + The value has to be a callable when passed to the Job, Task, or Actor. The callable is then exported and this value is converted to - the setup hook's function name for the observability purpose. + the setup hook's function name for observability. config: config for runtime environment. Either a dict or a RuntimeEnvConfig. Field: (1) setup_timeout_seconds, the timeout of runtime environment creation, timeout is in seconds. @@ -263,7 +263,7 @@ class MyClass: # field which is not supported. We should remove it # with the test. "docker", - "worker_setup_hook", + "worker_process_setup_hook", } extensions_fields: Set[str] = { @@ -281,7 +281,7 @@ def __init__( conda: Optional[Union[Dict[str, str], str]] = None, container: Optional[Dict[str, str]] = None, env_vars: Optional[Dict[str, str]] = None, - worker_setup_hook: Optional[Union[Callable, str]] = None, + worker_process_setup_hook: Optional[Union[Callable, str]] = None, config: Optional[Union[Dict, RuntimeEnvConfig]] = None, _validate: bool = True, **kwargs, @@ -303,8 +303,8 @@ def __init__( runtime_env["env_vars"] = env_vars if config is not None: runtime_env["config"] = config - if worker_setup_hook is not None: - runtime_env["worker_setup_hook"] = worker_setup_hook + if worker_process_setup_hook is not None: + runtime_env["worker_process_setup_hook"] = worker_process_setup_hook if runtime_env.get("java_jars"): runtime_env["java_jars"] = runtime_env.get("java_jars") diff --git a/python/ray/tests/test_runtime_env_setup_func.py b/python/ray/tests/test_runtime_env_setup_func.py index 32c0cf88bb071..f4811d81a3ca8 100644 --- a/python/ray/tests/test_runtime_env_setup_func.py +++ b/python/ray/tests/test_runtime_env_setup_func.py @@ -16,7 +16,7 @@ def configure_logging(level: int): ray.init( num_cpus=1, runtime_env={ - "worker_setup_hook": lambda: configure_logging(logging.DEBUG), + "worker_process_setup_hook": lambda: configure_logging(logging.DEBUG), "env_vars": {"ABC": "123"}, }, ) @@ -53,10 +53,12 @@ def get_env_var(self, key): # ray.get( # f.options( # runtime_env={ - # "worker_setup_hook": lambda: configure_logging(logging.INFO)} + # "worker_process_setup_hook": lambda: configure_logging(logging.INFO)} # ).remote("INFO")) # a = Actor.optinos( - # runtime_env={"worker_setup_hook": lambda: configure_logging(logging.INFO)} + # runtime_env={ + # "worker_process_setup_hook": lambda: configure_logging(logging.INFO) + # } # ).remote("INFO") # assert ray.get(a.__ray_ready__.remote()) @@ -88,7 +90,7 @@ def setup(): ray.init( num_cpus=1, runtime_env={ - "worker_setup_hook": setup, + "worker_process_setup_hook": setup, }, ) @@ -114,7 +116,7 @@ class A: ray.init( num_cpus=0, runtime_env={ - "worker_setup_hook": lambda: print(lock), + "worker_process_setup_hook": lambda: print(lock), }, ) assert "Failed to export the setup function." in str(e.value) @@ -130,7 +132,7 @@ def setup_func(): ray.init( num_cpus=1, runtime_env={ - "worker_setup_hook": setup_func, + "worker_process_setup_hook": setup_func, }, )