Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Cherry-pick][Streaming Generator] Fix a reference leak when pinning requests are … #35794

Conversation

rkooo567
Copy link
Contributor

@rkooo567 rkooo567 commented May 25, 2023

…received after refs are consumed. (#35712)

When we put a new object or an object is spilled, raylet sends a RPC to the owner. For example, it sends a request to the owner to pin the plasma object until the ref goes out of scope.

For none-generator tasks, we always guarantee to create return references, so we can handle these RPCs properly, However, when a generator task is used, we don't know the references ahead of time, which means it is not guaranteed to own the references when the RPC is received. In this case, we own a reference before it is reported from the executor. See the code below for more details.

ray/src/ray/core_worker/core_worker.cc

Line 3292 in 5acf41e

reference_counter_->OwnDynamicStreamingTaskReturnRef(object_id, generator_id);
However, this code is prone to error and causes reference leaks. Here are some examples. Imagine "WRITE 0" means the generator task return is written to a stream index 0. PinObjectRPCRecieved means the raylet RPC is received. READ 0 means we read the index 0 from a stream.

Example 1
WRITE 0
READ 0
PinObjectRPCRecieved
-> This means the reference is already consumed, and the PinObjectRPCRecieved comes after that. In this case, we shouldn't add a reference to the object, otherwise, it will leak because we cannot read this ref anymore (cuz it is already consumed).

Example 2
PinObjectRPCRecieved
In this case, WRITE 0 is failed. So, the when the object is owned by PinObjectRPCRecieved, it will be never be cleaned up.

To handle all these cases, we introduce a new API TemporarilyInsertToStreamIfNeeded. This API will own the object only when

the corresponding ref was never consumed
The stream has not been deleted.
And add the object ref to the temporary refs until it is reported. If the report fails, all the references will be removed when the stream is deleted.

Why are these changes needed?

Related issue number

#35634

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

rkooo567 added 2 commits May 25, 2023 04:54
…received after refs are consumed. (ray-project#35712)

When we put a new object or an object is spilled, raylet sends a RPC to the owner. For example, it sends a request to the owner to pin the plasma object until the ref goes out of scope.

For none-generator tasks, we always guarantee to create return references, so we can handle these RPCs properly, However, when a generator task is used, we don't know the references ahead of time, which means it is not guaranteed to own the references when the RPC is received. In this case, we own a reference before it is reported from the executor. See the code below for more details.

ray/src/ray/core_worker/core_worker.cc

Line 3292 in 5acf41e

 reference_counter_->OwnDynamicStreamingTaskReturnRef(object_id, generator_id);
However, this code is prone to error and causes reference leaks. Here are some examples. Imagine "WRITE 0" means the generator task return is written to a stream index 0. PinObjectRPCRecieved means the raylet RPC is received. READ 0 means we read the index 0 from a stream.

Example 1
WRITE 0
READ 0
PinObjectRPCRecieved
-> This means the reference is already consumed, and the PinObjectRPCRecieved comes after that. In this case, we shouldn't add a reference to the object, otherwise, it will leak because we cannot read this ref anymore (cuz it is already consumed).

Example 2
PinObjectRPCRecieved
In this case, WRITE 0 is failed. So, the when the object is owned by PinObjectRPCRecieved, it will be never be cleaned up.

To handle all these cases, we introduce a new API TemporarilyInsertToStreamIfNeeded. This API will own the object only when

the corresponding ref was never consumed
The stream has not been deleted.
And add the object ref to the temporary refs until it is reported. If the report fails, all the references will be removed when the stream is deleted.

Signed-off-by: SangBin Cho <rkooo567@gmail.com>
…x-ref-reported-from-plasma-bug

Signed-off-by: SangBin Cho <rkooo567@gmail.com>
Copy link
Collaborator

@zhe-thoughts zhe-thoughts left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's pick and be careful about issues. @ArturNiederfahrenhorst @can-anyscale . Thanks!

@rkooo567 rkooo567 added the tests-ok The tagger certifies test failures are unrelated and assumes personal liability. label May 26, 2023
@ArturNiederfahrenhorst ArturNiederfahrenhorst merged commit c3c36cc into ray-project:releases/2.5.0 May 26, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
tests-ok The tagger certifies test failures are unrelated and assumes personal liability.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants