-
Notifications
You must be signed in to change notification settings - Fork 6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[CoreWorker] lazy bind core_work's job_config through task spec. #31375
Conversation
@liuyang-my the Java test failed but i'm not quite sure what exactly happened reading the logs. Do you know what might have gone wrong? (presumably we are hitting some deadlock issues?) https://buildkite.com/ray-project/oss-ci-build-pr/builds/8396#01857127-086b-406b-92da-6f935dcc8447 is the failed test |
@@ -298,6 +298,30 @@ message ActorDiedErrorContext { | |||
} | |||
// ---Actor death contexts end---- | |||
|
|||
message JobConfig { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
move JobConfig to common.proto to break the circular dependency
@@ -48,12 +48,15 @@ ObjectID LocalModeTaskSubmitter::Submit(InvocationSpec &invocation, | |||
std::string task_name = | |||
invocation.name.empty() ? functionDescriptor->DefaultTaskName() : invocation.name; | |||
|
|||
static rpc::JobConfig kDefaultJobConfig; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is static
a premature optimization? Is it possible for this to be called from multiple threads and corrupt the object?
Question: will this slow down the perfs? I think this adds runtime env to all task specs (previously, only one). Do you mind benchmarking the perf regression? Besides this, do you think it's good to pass job config through stdin for the workers? If doing this way, we probably could limit the all changes in worker pool. I'm also thinking in the future this maybe need extension. We probably don't want to pass everything to task spec I believe. Btw, ok with this approach if the benchmark with job config is ok. But let's add comment to job config proto to let people know it's passed to all tasks repeatedly. Still reviewing... |
@@ -88,7 +88,7 @@ class AbstractRayRuntime : public RayRuntime { | |||
|
|||
const TaskID &GetCurrentTaskId(); | |||
|
|||
const JobID &GetCurrentJobID(); | |||
JobID GetCurrentJobID(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why update this one but not the rest?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this calls to the context which returns by value instead of reference; so we change it accordingly.
change them to return by value is a great idea, but will yield double the size of the PR and touching a lot of cpp runtime code, thus we prefer not changing it in this PR.
@@ -48,12 +48,15 @@ ObjectID LocalModeTaskSubmitter::Submit(InvocationSpec &invocation, | |||
std::string task_name = | |||
invocation.name.empty() ? functionDescriptor->DefaultTaskName() : invocation.name; | |||
|
|||
rpc::JobConfig kDefaultJobConfig; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
make it a global const static variable?
not sure, but I always feel kXYZ is global const variables.
src/ray/common/task/task_spec.cc
Outdated
@@ -149,6 +149,8 @@ JobID TaskSpecification::JobId() const { | |||
return JobID::FromBinary(message_->job_id()); | |||
} | |||
|
|||
rpc::JobConfig TaskSpecification::JobConfig() const { return message_->job_config(); } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not const reference?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
rpc::Address address; | ||
spec_builder.SetCommonTaskSpec(id, | ||
"dummy_task", | ||
Language::PYTHON, | ||
FunctionDescriptorBuilder::BuildPython("", "", "", ""), | ||
job_id, | ||
config, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
rpc::JobConfig()
seems easier to read. Otherwise we need check what's config in the code.
src/ray/core_worker/context.h
Outdated
JobID current_job_id_ GUARDED_BY(mutex_); | ||
rpc::JobConfig job_config_ GUARDED_BY(mutex_); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we use optional here if it's lazily initialized?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1. Why don't we use optional instead of default config?
thanks for reviewing! kicking off benchmark here: https://buildkite.com/ray-project/release-tests-pr/builds/24753 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This PR doesn't touch the worker pool code. In that case, those workers started by each job still considered to belong to the job?
Also, I am curious about the behavior changes. Previously,
- When the worker starts, it belongs to the job
- When the job terminates the workers are killed.
With this change, how are these semantics changed?
src/ray/common/task/task_spec.cc
Outdated
@@ -149,6 +149,8 @@ JobID TaskSpecification::JobId() const { | |||
return JobID::FromBinary(message_->job_id()); | |||
} | |||
|
|||
rpc::JobConfig TaskSpecification::JobConfig() const { return message_->job_config(); } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
src/ray/core_worker/context.cc
Outdated
job_config_ = job_config; | ||
} | ||
RAY_CHECK(current_job_id_ == job_id); | ||
RAY_CHECK(google::protobuf::util::MessageDifferencer::Equals(job_config_, job_config_)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this necessary? Is overhead of MessageDifferencer::Equals
small?
return current_job_id_; | ||
} | ||
|
||
rpc::JobConfig WorkerContext::GetCurrentJobConfig() const { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
const reference?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
accessing a reference to a state in critical section yields undefined behavior.
src/ray/core_worker/context.h
Outdated
JobID current_job_id_ GUARDED_BY(mutex_); | ||
rpc::JobConfig job_config_ GUARDED_BY(mutex_); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1. Why don't we use optional instead of default config?
src/ray/core_worker/core_worker.cc
Outdated
|
||
if (options_.worker_type == WorkerType::DRIVER && | ||
!options_.serialized_job_config.empty()) { | ||
// Driver populates job_config through worker startup options. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC driver is not started with worker startup options?
Maybe it should be "driver populates the job config via initialization. Workers populates it when the first task is received"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
request change since Cade approves it already..
Fix and reopen java tests closed in #31375 Co-authored-by: Marcus Zhang <zxl265370@antgroup.com>
) Previously the worker get job_config information from raylet on construction. This prevents us from lazily binding job_config to workers. This PR enables lazily bind job_config, by piggybacking job_confg in TaskSpec, and initialize the job_config when the worker receives task execution request (push_task) call. We also refactor the WorkerContext and RayletClient as part of the chagne.
Fix and reopen java tests closed in #31375 Co-authored-by: Marcus Zhang <zxl265370@antgroup.com>
Why are these changes needed?
Previously the worker get job_config information from raylet on construction. This prevents us from lazily binding job_config to workers. This PR enables lazily bind job_config, by piggybacking job_confg in TaskSpec, and initialize the job_config when the worker receives task execution request (push_task) call.
We also refactor the WorkerContext and RayletClient as part of the chagne.
Related issue number
Checks
git commit -s
) in this PR.scripts/format.sh
to lint the changes in this PR.