Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Logging] Switch worker_setup_hook to worker_process_setup_hook #37247

Merged
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -384,10 +384,14 @@ ray.get(f.remote("A log message for a task."))

```{admonition} Caution
:class: caution
This is an experimental feature. It doesn't support [Ray Client](ray-client-ref) yet.
This is an experimental feature. The semantic of the API is subject to change.
It doesn't support [Ray Client](ray-client-ref) yet.

When you use `ray.init(runtime_env={...})` with a [Ray Job Submission](jobs-quickstart) API (`ray job subimt --working-dir` or `ray job submit --runtime-env ...`), runtime_env passed to `ray.init` is ignored.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
When you use `ray.init(runtime_env={...})` with a [Ray Job Submission](jobs-quickstart) API (`ray job subimt --working-dir` or `ray job submit --runtime-env ...`), runtime_env passed to `ray.init` is ignored.
When you use `ray.init(runtime_env={...})` with a [Ray Job Submission](jobs-quickstart) API (`ray job submit --working-dir` or `ray job submit --runtime-env ...`), runtime_env passed to `ray.init` is ignored.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is hard to parse. Are you saying that these two switches don't work when you just Ray Jobs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ray job ... is a job API. When you use the job API, the driver runtime env is ignored.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any recommended sentences?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed it. Can you take a look? If there's no response by today, I will just merge a PR!

```

Use `worker_process_setup_hook` to apply the new logging configuration to all worker processes within a job.

```python
# driver.py
def logging_setup_func():
Expand Down
11 changes: 8 additions & 3 deletions python/ray/_private/function_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,11 @@
format_error_message,
)
from ray._private.serialization import pickle_dumps
from ray._raylet import JobID, PythonFunctionDescriptor, WORKER_SETUP_HOOK_KEY_NAME_GCS
from ray._raylet import (
JobID,
PythonFunctionDescriptor,
WORKER_PROCESS_SETUP_HOOK_KEY_NAME_GCS,
)

FunctionExecutionInfo = namedtuple(
"FunctionExecutionInfo", ["function", "function_name", "max_calls"]
Expand Down Expand Up @@ -178,7 +182,8 @@ def export_setup_func(
) -> bytes:
"""Export the setup hook function and return the key."""
pickled_function = pickle_dumps(
setup_func, f"Cannot serialize the worker_setup_hook {setup_func.__name__}"
setup_func,
"Cannot serialize the worker_process_setup_hook " f"{setup_func.__name__}",
)

function_to_run_id = hashlib.shake_128(pickled_function).digest(
Expand All @@ -187,7 +192,7 @@ def export_setup_func(
key = make_function_table_key(
# This value should match with gcs_function_manager.h.
# Otherwise, it won't be GC'ed.
WORKER_SETUP_HOOK_KEY_NAME_GCS.encode(),
WORKER_PROCESS_SETUP_HOOK_KEY_NAME_GCS.encode(),
# b"FunctionsToRun",
self._worker.current_job_id.binary(),
function_to_run_id,
Expand Down
6 changes: 4 additions & 2 deletions python/ray/_private/ray_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,9 @@ def gcs_actor_scheduling_enabled():
"RAY_ENABLE_RECORD_ACTOR_TASK_LOGGING", False
)

WORKER_SETUP_HOOK_ENV_VAR = "__RAY_WORKER_SETUP_HOOK_ENV_VAR"
RAY_WORKER_SETUP_HOOK_LOAD_TIMEOUT_ENV_VAR = "RAY_WORKER_SETUP_HOOK_LOAD_TIMEOUT"
WORKER_PROCESS_SETUP_HOOK_ENV_VAR = "__RAY_WORKER_PROCESS_SETUP_HOOK_ENV_VAR"
RAY_WORKER_PROCESS_SETUP_HOOK_LOAD_TIMEOUT_ENV_VAR = (
"RAY_WORKER_PROCESS_SETUP_HOOK_LOAD_TIMEOUT" # noqa
)

RAY_DEFAULT_LABEL_KEYS_PREFIX = "ray.io/"
34 changes: 19 additions & 15 deletions python/ray/_private/runtime_env/setup_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@

def get_import_export_timeout():
return int(
os.environ.get(ray_constants.RAY_WORKER_SETUP_HOOK_LOAD_TIMEOUT_ENV_VAR, "60")
os.environ.get(
ray_constants.RAY_WORKER_PROCESS_SETUP_HOOK_LOAD_TIMEOUT_ENV_VAR, "60"
)
)


Expand All @@ -27,15 +29,15 @@ def _encode_function_key(key: str) -> bytes:
return base64.b64decode(key)


def upload_worker_setup_hook_if_needed(
def upload_worker_process_setup_hook_if_needed(
runtime_env: Union[Dict[str, Any], RuntimeEnv],
worker: "ray.Worker",
) -> Union[Dict[str, Any], RuntimeEnv]:
"""Uploads the worker_setup_hook to GCS with a key.
"""Uploads the worker_process_setup_hook to GCS with a key.

runtime_env["worker_setup_hook"] is converted to a decoded key
runtime_env["worker_process_setup_hook"] is converted to a decoded key
that can load the worker setup hook function from GCS.
I.e., you can use internalKV.Get(runtime_env["worker_setup_hook])
I.e., you can use internalKV.Get(runtime_env["worker_process_setup_hook])
rkooo567 marked this conversation as resolved.
Show resolved Hide resolved
to access the worker setup hook from GCS.

Args:
Expand All @@ -48,13 +50,13 @@ def upload_worker_setup_hook_if_needed(
a string. The given decoder is used to decode the function
key.
"""
setup_func = runtime_env.get("worker_setup_hook")
setup_func = runtime_env.get("worker_process_setup_hook")
if setup_func is None:
return runtime_env

if not isinstance(setup_func, Callable):
raise TypeError(
"worker_setup_hook must be a function, " f"got {type(setup_func)}."
"worker_process_setup_hook must be a function, " f"got {type(setup_func)}."
)
# TODO(sang): Support modules.

Expand All @@ -67,39 +69,41 @@ def upload_worker_setup_hook_if_needed(
"Failed to export the setup function."
) from e
env_vars = runtime_env.get("env_vars", {})
assert ray_constants.WORKER_SETUP_HOOK_ENV_VAR not in env_vars, (
f"The env var, {ray_constants.WORKER_SETUP_HOOK_ENV_VAR}, "
assert ray_constants.WORKER_PROCESS_SETUP_HOOK_ENV_VAR not in env_vars, (
f"The env var, {ray_constants.WORKER_PROCESS_SETUP_HOOK_ENV_VAR}, "
"is not permitted because it is reserved for the internal use."
)
env_vars[ray_constants.WORKER_SETUP_HOOK_ENV_VAR] = _decode_function_key(key)
env_vars[ray_constants.WORKER_PROCESS_SETUP_HOOK_ENV_VAR] = _decode_function_key(
key
)
runtime_env["env_vars"] = env_vars
# Note: This field is no-op. We don't have a plugin for the setup hook
# because we can implement it simply using an env var.
# This field is just for the observability purpose, so we store
# the name of the method.
runtime_env["worker_setup_hook"] = setup_func.__name__
runtime_env["worker_process_setup_hook"] = setup_func.__name__
return runtime_env


def load_and_execute_setup_hook(
worker_setup_hook_key: str,
worker_process_setup_hook_key: str,
) -> Optional[str]:
"""Load the setup hook from a given key and execute.

Args:
worker_setup_hook_key: The key to import the setup hook
worker_process_setup_hook_key: The key to import the setup hook
from GCS.
Returns:
An error message if it fails. None if it succeeds.
"""
assert worker_setup_hook_key is not None
assert worker_process_setup_hook_key is not None
worker = ray._private.worker.global_worker
assert worker.connected

func_manager = worker.function_actor_manager
try:
worker_setup_func_info = func_manager.fetch_registered_method(
_encode_function_key(worker_setup_hook_key),
_encode_function_key(worker_process_setup_hook_key),
timeout=get_import_export_timeout(),
)
except Exception:
Expand Down
6 changes: 4 additions & 2 deletions python/ray/_private/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,9 @@
from ray._private.runtime_env.constants import RAY_JOB_CONFIG_JSON_ENV_VAR
from ray._private.runtime_env.py_modules import upload_py_modules_if_needed
from ray._private.runtime_env.working_dir import upload_working_dir_if_needed
from ray._private.runtime_env.setup_hook import upload_worker_setup_hook_if_needed
from ray._private.runtime_env.setup_hook import (
upload_worker_process_setup_hook_if_needed,
)
from ray._private.storage import _load_class
from ray._private.utils import get_ray_doc_version
from ray.exceptions import ObjectStoreFullError, RayError, RaySystemError, RayTaskError
Expand Down Expand Up @@ -2170,7 +2172,7 @@ def connect(
runtime_env = upload_working_dir_if_needed(
runtime_env, scratch_dir, logger=logger
)
runtime_env = upload_worker_setup_hook_if_needed(
runtime_env = upload_worker_process_setup_hook_if_needed(
runtime_env,
worker,
)
Expand Down
8 changes: 5 additions & 3 deletions python/ray/_private/workers/default_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,9 +252,11 @@
ray._private.utils.try_import_each_module(module_names_to_import)

# If the worker setup function is configured, run it.
worker_setup_hook_key = os.getenv(ray_constants.WORKER_SETUP_HOOK_ENV_VAR)
if worker_setup_hook_key:
error = load_and_execute_setup_hook(worker_setup_hook_key)
worker_process_setup_hook_key = os.getenv(
ray_constants.WORKER_PROCESS_SETUP_HOOK_ENV_VAR
)
if worker_process_setup_hook_key:
error = load_and_execute_setup_hook(worker_process_setup_hook_key)
if error is not None:
worker.core_worker.exit_worker("system", error)

Expand Down
2 changes: 1 addition & 1 deletion python/ray/includes/common.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,6 @@ cdef class GcsClientOptions:
return <CGcsClientOptions*>(self.inner.get())


WORKER_SETUP_HOOK_KEY_NAME_GCS = str(kWorkerSetupHookKeyName)
WORKER_PROCESS_SETUP_HOOK_KEY_NAME_GCS = str(kWorkerSetupHookKeyName)
RESOURCE_UNIT_SCALING = kResourceUnitScaling
STREAMING_GENERATOR_RETURN = kStreamingGeneratorReturn
12 changes: 6 additions & 6 deletions python/ray/runtime_env/runtime_env.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,8 +235,8 @@ class MyClass:
The `run_options` list spec is here:
https://docs.docker.com/engine/reference/run/
env_vars: Environment variables to set.
worker_setup_hook: The setup hook that's called after workers
start and before tasks and actors are scheduled.
worker_process_setup_hook: (Experimental) The setup hook that's
called after workers start and before tasks and actors are scheduled.
rkooo567 marked this conversation as resolved.
Show resolved Hide resolved
The value has to be a callable when passed to the job/task/actor.
rkooo567 marked this conversation as resolved.
Show resolved Hide resolved
The callable is then exported and this value is converted to
the setup hook's function name for the observability purpose.
rkooo567 marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -263,7 +263,7 @@ class MyClass:
# field which is not supported. We should remove it
# with the test.
"docker",
"worker_setup_hook",
"worker_process_setup_hook",
}

extensions_fields: Set[str] = {
Expand All @@ -281,7 +281,7 @@ def __init__(
conda: Optional[Union[Dict[str, str], str]] = None,
container: Optional[Dict[str, str]] = None,
env_vars: Optional[Dict[str, str]] = None,
worker_setup_hook: Optional[Union[Callable, str]] = None,
worker_process_setup_hook: Optional[Union[Callable, str]] = None,
config: Optional[Union[Dict, RuntimeEnvConfig]] = None,
_validate: bool = True,
**kwargs,
Expand All @@ -303,8 +303,8 @@ def __init__(
runtime_env["env_vars"] = env_vars
if config is not None:
runtime_env["config"] = config
if worker_setup_hook is not None:
runtime_env["worker_setup_hook"] = worker_setup_hook
if worker_process_setup_hook is not None:
runtime_env["worker_process_setup_hook"] = worker_process_setup_hook

if runtime_env.get("java_jars"):
runtime_env["java_jars"] = runtime_env.get("java_jars")
Expand Down
14 changes: 8 additions & 6 deletions python/ray/tests/test_runtime_env_setup_func.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ def configure_logging(level: int):
ray.init(
num_cpus=1,
runtime_env={
"worker_setup_hook": lambda: configure_logging(logging.DEBUG),
"worker_process_setup_hook": lambda: configure_logging(logging.DEBUG),
"env_vars": {"ABC": "123"},
},
)
Expand Down Expand Up @@ -53,10 +53,12 @@ def get_env_var(self, key):
# ray.get(
# f.options(
# runtime_env={
# "worker_setup_hook": lambda: configure_logging(logging.INFO)}
# "worker_process_setup_hook": lambda: configure_logging(logging.INFO)}
# ).remote("INFO"))
# a = Actor.optinos(
# runtime_env={"worker_setup_hook": lambda: configure_logging(logging.INFO)}
# runtime_env={
# "worker_process_setup_hook": lambda: configure_logging(logging.INFO)
# }
# ).remote("INFO")
# assert ray.get(a.__ray_ready__.remote())

Expand Down Expand Up @@ -88,7 +90,7 @@ def setup():
ray.init(
num_cpus=1,
runtime_env={
"worker_setup_hook": setup,
"worker_process_setup_hook": setup,
},
)

Expand All @@ -114,7 +116,7 @@ class A:
ray.init(
num_cpus=0,
runtime_env={
"worker_setup_hook": lambda: print(lock),
"worker_process_setup_hook": lambda: print(lock),
},
)
assert "Failed to export the setup function." in str(e.value)
Expand All @@ -130,7 +132,7 @@ def setup_func():
ray.init(
num_cpus=1,
runtime_env={
"worker_setup_hook": setup_func,
"worker_process_setup_hook": setup_func,
},
)

Expand Down