Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core][state] Task events backend - worker task event buffer implementation [1/n] #30867

Merged
merged 15 commits into from
Dec 7, 2022

Conversation

rickyyx
Copy link
Contributor

@rickyyx rickyyx commented Dec 2, 2022

Why are these changes needed?

For details of the design and background see this doc

Previous PRs:

In this PR:

  • Added a TaskEventBuffer class which serves as an abstraction to store task events, and push those events to GCS in batches periodically.
  • Each CoreWorker will own one single TaskEventBuffer, and events (both task status change events and profiling events) will be added to a local in-memory buffer.
  • The TaskEventBuffer also owns its own GCS client and io thread, which is independent from the main io_contexts.

Related issue number

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

Signed-off-by: rickyyx <rickyx@anyscale.com>
Signed-off-by: rickyyx <rickyx@anyscale.com>
Signed-off-by: rickyyx <rickyx@anyscale.com>
@rickyyx rickyyx marked this pull request as ready for review December 2, 2022 23:13
Signed-off-by: rickyyx <rickyx@anyscale.com>
@rickyyx
Copy link
Contributor Author

rickyyx commented Dec 2, 2022

Local benchmarking:

image

Some profiling results:

This is a result of running

n = 10000
m = 4
actors = [Actor.remote() for _ in range(m)]


def multi_task():
    submitted = [a.small_value_batch.remote(n) for a in actors]
    ray.get(submitted)

image

image

image

  • The overhead mostly comes from memory related ops (constructing and destructing) of grpc structs when recording status changes in the main io thread.

Copy link
Contributor

@ericl ericl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great--- main comment is around unifying some of the code for SetStatus.

src/ray/core_worker/core_worker.h Outdated Show resolved Hide resolved
src/ray/core_worker/task_manager.cc Outdated Show resolved Hide resolved
src/ray/core_worker/task_manager.cc Outdated Show resolved Hide resolved
@ericl ericl added the @author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer. label Dec 3, 2022
@ericl
Copy link
Contributor

ericl commented Dec 3, 2022

For the performance change, I wonder if we can make the following unrelated optimization and gain back most of the difference: #30872

Basically, serialize the scheduling strategy protos to binary and compare those instead of using the message differencer.

src/ray/core_worker/core_worker.cc Outdated Show resolved Hide resolved
src/ray/core_worker/task_manager.cc Outdated Show resolved Hide resolved
src/ray/core_worker/task_manager.cc Show resolved Hide resolved
src/ray/core_worker/task_event_buffer.h Show resolved Hide resolved
src/ray/core_worker/task_event_buffer.h Outdated Show resolved Hide resolved
src/ray/core_worker/task_event_buffer.h Outdated Show resolved Hide resolved
src/ray/core_worker/task_event_buffer.cc Outdated Show resolved Hide resolved
src/ray/core_worker/task_event_buffer.cc Outdated Show resolved Hide resolved
src/ray/core_worker/task_event_buffer.cc Show resolved Hide resolved
src/ray/core_worker/task_event_buffer.cc Show resolved Hide resolved
Signed-off-by: rickyyx <rickyx@anyscale.com>
Signed-off-by: rickyyx <rickyx@anyscale.com>
Signed-off-by: rickyyx <rickyx@anyscale.com>
rkooo567 pushed a commit that referenced this pull request Dec 14, 2022
…n] (#30979)

Previous PRs:

[core][state] Task events backend: config and interface definitions [0/n]  #30829: Interface and protobuf definitions.
[core][state] Task events backend - split drop count on worker #30953: Splitting of drop count for various events type on worker.
[core][state] Task events backend - worker task event buffer implementation [1/n] #30867: TaskEventBuffer implementation
In this PR:

Added GcsTaskManager that stores the task events on the GCS side.
The GcsTsakManager has its own io service and io thread that's separated from the main rpc thread/io_context. Handling of rpcs will be posted to its own internal io_service.
Implementation for the update path.
Interface for the read path.
Next PRs:

Implementation for the update path of GcsTaskManager
Porting of profiling events
Porting of state api task.
WeichenXu123 pushed a commit to WeichenXu123/ray that referenced this pull request Dec 19, 2022
…tation [1/n] (ray-project#30867)

In this PR:

Added a TaskEventBuffer class which serves as an abstraction to store task events, and push those events to GCS in batches periodically.
Each CoreWorker will own one single TaskEventBuffer, and events (both task status change events and profiling events) will be added to a local in-memory buffer.
The TaskEventBuffer also owns its own GCS client and io thread, which is independent from the main io_contexts.

Signed-off-by: Weichen Xu <weichen.xu@databricks.com>
WeichenXu123 pushed a commit to WeichenXu123/ray that referenced this pull request Dec 19, 2022
…n] (ray-project#30979)

Previous PRs:

[core][state] Task events backend: config and interface definitions [0/n]  ray-project#30829: Interface and protobuf definitions.
[core][state] Task events backend - split drop count on worker ray-project#30953: Splitting of drop count for various events type on worker.
[core][state] Task events backend - worker task event buffer implementation [1/n] ray-project#30867: TaskEventBuffer implementation
In this PR:

Added GcsTaskManager that stores the task events on the GCS side.
The GcsTsakManager has its own io service and io thread that's separated from the main rpc thread/io_context. Handling of rpcs will be posted to its own internal io_service.
Implementation for the update path.
Interface for the read path.
Next PRs:

Implementation for the update path of GcsTaskManager
Porting of profiling events
Porting of state api task.

Signed-off-by: Weichen Xu <weichen.xu@databricks.com>
ericl pushed a commit that referenced this pull request Dec 21, 2022
**Previous PRs:**
 - #30829: 
 - #30953: 
 - #30867: 
 - #30979: 
 - #30934
 - #31207

**In This PR:** 
- Remove old code for timeline/profiling backend.
ericl pushed a commit that referenced this pull request Dec 23, 2022
**Previous PRs:**
 - #30829: 
 - #30953: 
 - #30867: 
 - #30979: 
 - #30934
 - #31207
**This PR:** 

With the change, 
- `list_tasks` now will return tasks with attempt number as an additional column. 
- `get_task` might return multiple task attempt entries if there are retries. 


There is also some plumbing in the test and in core (esp  in the test logic) given the changes. Major changes in the PR are: 
- Add limit support to `GcsTaskManager`
- Change the state aggregator to get tasks from GCS.
AmeerHajAli pushed a commit that referenced this pull request Jan 12, 2023
…n] (#30979)

Previous PRs:

[core][state] Task events backend: config and interface definitions [0/n]  #30829: Interface and protobuf definitions.
[core][state] Task events backend - split drop count on worker #30953: Splitting of drop count for various events type on worker.
[core][state] Task events backend - worker task event buffer implementation [1/n] #30867: TaskEventBuffer implementation
In this PR:

Added GcsTaskManager that stores the task events on the GCS side.
The GcsTsakManager has its own io service and io thread that's separated from the main rpc thread/io_context. Handling of rpcs will be posted to its own internal io_service.
Implementation for the update path.
Interface for the read path.
Next PRs:

Implementation for the update path of GcsTaskManager
Porting of profiling events
Porting of state api task.
AmeerHajAli pushed a commit that referenced this pull request Jan 12, 2023
**Previous PRs:**
 - #30829: 
 - #30953: 
 - #30867: 
 - #30979: 
 - #30934
 - #31207

**In This PR:** 
- Remove old code for timeline/profiling backend.
AmeerHajAli pushed a commit that referenced this pull request Jan 12, 2023
**Previous PRs:**
 - #30829: 
 - #30953: 
 - #30867: 
 - #30979: 
 - #30934
 - #31207
**This PR:** 

With the change, 
- `list_tasks` now will return tasks with attempt number as an additional column. 
- `get_task` might return multiple task attempt entries if there are retries. 


There is also some plumbing in the test and in core (esp  in the test logic) given the changes. Major changes in the PR are: 
- Add limit support to `GcsTaskManager`
- Change the state aggregator to get tasks from GCS.
tamohannes pushed a commit to ju2ez/ray that referenced this pull request Jan 25, 2023
…tation [1/n] (ray-project#30867)

In this PR:

Added a TaskEventBuffer class which serves as an abstraction to store task events, and push those events to GCS in batches periodically.
Each CoreWorker will own one single TaskEventBuffer, and events (both task status change events and profiling events) will be added to a local in-memory buffer.
The TaskEventBuffer also owns its own GCS client and io thread, which is independent from the main io_contexts.

Signed-off-by: tmynn <hovhannes.tamoyan@gmail.com>
tamohannes pushed a commit to ju2ez/ray that referenced this pull request Jan 25, 2023
…n] (ray-project#30979)

Previous PRs:

[core][state] Task events backend: config and interface definitions [0/n]  ray-project#30829: Interface and protobuf definitions.
[core][state] Task events backend - split drop count on worker ray-project#30953: Splitting of drop count for various events type on worker.
[core][state] Task events backend - worker task event buffer implementation [1/n] ray-project#30867: TaskEventBuffer implementation
In this PR:

Added GcsTaskManager that stores the task events on the GCS side.
The GcsTsakManager has its own io service and io thread that's separated from the main rpc thread/io_context. Handling of rpcs will be posted to its own internal io_service.
Implementation for the update path.
Interface for the read path.
Next PRs:

Implementation for the update path of GcsTaskManager
Porting of profiling events
Porting of state api task.

Signed-off-by: tmynn <hovhannes.tamoyan@gmail.com>
tamohannes pushed a commit to ju2ez/ray that referenced this pull request Jan 25, 2023
…ect#31247)

**Previous PRs:**
 - ray-project#30829: 
 - ray-project#30953: 
 - ray-project#30867: 
 - ray-project#30979: 
 - ray-project#30934
 - ray-project#31207

**In This PR:** 
- Remove old code for timeline/profiling backend.

Signed-off-by: tmynn <hovhannes.tamoyan@gmail.com>
tamohannes pushed a commit to ju2ez/ray that referenced this pull request Jan 25, 2023
**Previous PRs:**
 - ray-project#30829: 
 - ray-project#30953: 
 - ray-project#30867: 
 - ray-project#30979: 
 - ray-project#30934
 - ray-project#31207
**This PR:** 

With the change, 
- `list_tasks` now will return tasks with attempt number as an additional column. 
- `get_task` might return multiple task attempt entries if there are retries. 


There is also some plumbing in the test and in core (esp  in the test logic) given the changes. Major changes in the PR are: 
- Add limit support to `GcsTaskManager`
- Change the state aggregator to get tasks from GCS.

Signed-off-by: tmynn <hovhannes.tamoyan@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants