Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use detached lifetime for stats actor #25271

Merged
merged 18 commits into from
Aug 12, 2022

Conversation

jianoaix
Copy link
Contributor

@jianoaix jianoaix commented May 28, 2022

Why are these changes needed?

The actor handle held at Ray client will become dangling if the Ray cluster is shutdown, and in such case if the user tries to get the actor again it will result in crash. This happened in a real user and blocked them from making progress.

This change makes the stats actor detached, and instead of keeping a handle, we access it via its name. This way we can make sure re-create this actor if the cluster gets restarted.

Related issue number

Closes #25237

Checks

  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

# Actor handle, job id the actor was created for.
_stats_actor = [None, None]
# Actor handle, job id, client id the actor was created for.
_stats_actor = [None, None, None]


def _get_or_create_stats_actor():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm I don't remember why we did it this way, would it be better to use actor.options(get_if_exists=True, name="_dataset_stats_actor").remote() instead to get-or-create the actor?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about changing to this option (getting by name) as well, but it seems not meet the requirement.
So the requirement here is that this actor is reused across datasets created by this same process. For the getting by name approach:

  • It may work for the same dataset, but after it's completed and the actor refcount goes to zero, the datasets created later will not be able to get that same actor (have to recreate).
  • Alternatively, we may create a detached actor, but in that case it will be shared even across different driver processes.

Keeping a reference here and clearing it upon new connection or new driver is essentially what this PR does to serve this requirement.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, sharing across drivers should be ok right? That should be uncommon case anyway.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If that's not a concern, then getting by name is indeed simpler.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this stats actor will never be cleaned up for the lifetime of the Ray cluster, and the read stats for each Dataset will never get cleaned up within the actor, which is a bit of a leaky lifecycle. This seems fine for now to unblock the Ray Client use case, but we should probably open a P2 to improve the stats actor lifecycle, or eliminate the stats actor all together, if possible.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My previous code was made quite close to the existing way. I think the feedback here is it's a bit complicated AND detached actor isn't a concern to use. It may make sense to have a new kind of lifetime between refcounted and detached, e.g. per-job lifetime actor.

@ericl ericl added the @author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer. label May 28, 2022
@jianoaix jianoaix removed the @author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer. label May 31, 2022
@jianoaix jianoaix changed the title Null out dangling stats actor handle held at Ray client Use detached lifetime for stats actor May 31, 2022
Copy link
Contributor

@ericl ericl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah also need to set the namespace, otherwise you'll leak one actor per job.

@ericl ericl added the @author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer. label May 31, 2022
@clarkzinzow
Copy link
Contributor

@ericl @jianoaix So we can assume that the get_if_exists=True path is ~as cheap as the global actor handle cache since the core worker should cache actor handle fetches, right? It looks as if that path should only involve an RPC on the first fetch for a given worker process.

@ericl
Copy link
Contributor

ericl commented May 31, 2022

Hmm that's a good question, we should validate this for sure prior to merging.

@jianoaix
Copy link
Contributor Author

jianoaix commented Jun 2, 2022

Compared to the data loading of read task, one RPC seems a small cost?

Do we have a test to run the impact of this?

@ericl
Copy link
Contributor

ericl commented Jun 2, 2022

Simple enough to run a trivial dataset workload with small blocks before/after this PR. Maybe like 10000 blocks range + map batches?

@jianoaix
Copy link
Contributor Author

jianoaix commented Jun 2, 2022

Tried a simple test like this:

    total_time = 0
    for _ in range(16):
        start_time = time.time()
        ds = ray.data.range(100000, parallelism=10000)
        ds.map_batches(lambda x: x)
        total_time = time.time() - start_time
    print("mean time:", total_time / 16)

On a local cluster with 8 nodes and 1 cpu/node:

def build_cluster(num_nodes, num_cpus):
    cluster = Cluster()
    for _ in range(num_nodes):
        cluster.add_node(num_cpus=num_cpus)
    cluster.wait_for_nodes()
    return cluster

cluster = build_cluster(8, 1)

With this PR: mean time: 2.742537647485733
Without this PR: mean time: 2.706667184829712
The difference is 1.33%, which is small given the block is very small (just 10 ints). But the nodes are all on laptop so the RPC might be cheaper than real cluster.

Copy link
Contributor

@ericl ericl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good enough for me!

# Actor handle, job id the actor was created for.
_stats_actor = [None, None]
# Actor handle, job id, client id the actor was created for.
_stats_actor = [None, None, None]


def _get_or_create_stats_actor():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this stats actor will never be cleaned up for the lifetime of the Ray cluster, and the read stats for each Dataset will never get cleaned up within the actor, which is a bit of a leaky lifecycle. This seems fine for now to unblock the Ray Client use case, but we should probably open a P2 to improve the stats actor lifecycle, or eliminate the stats actor all together, if possible.

@clarkzinzow
Copy link
Contributor

clarkzinzow commented Jun 2, 2022

Also, small nit - this docstring comment on the actor scope is not accurate anymore:

This actor is shared across all datasets created by the same process.

@jianoaix
Copy link
Contributor Author

jianoaix commented Jun 2, 2022

Microbenchmark:

    start_time = time.time()
    for _ in range(1000):
        ah = ray.data.impl.stats._get_or_create_stats_actor()
    print("mean time to get:", (time.time() - start_time) / 1000)

Before: 1.4783143997192383e-05 (sec)
After: 0.0005355322360992432 (sec)
Diff: 36x increase

@jianoaix jianoaix removed the @author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer. label Aug 2, 2022
@jianoaix jianoaix added @author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer. tests-ok The tagger certifies test failures are unrelated and assumes personal liability. and removed @author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer. labels Aug 3, 2022
@jianoaix
Copy link
Contributor Author

jianoaix commented Aug 3, 2022

I was looking at the test failures, but it turned out they were already on the flaky test list. So the PR is ready to review/merge.

@jianoaix
Copy link
Contributor Author

jianoaix commented Aug 3, 2022

@ericl @clarkzinzow ptal, thanks

Copy link
Contributor

@clarkzinzow clarkzinzow left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, although I am a bit concerned about the leaky lifecycle of a long-lived detached actor. This stats actor will live for the lifetime of the cluster, and read stats won't be cleaned up for the lifetime of the cluster.

This could be done as a follow-up (and shouldn't block merging this), but what do you think about adding best-effort clean-up of these read stats when DatasetStats is destructed?

@ray.remote(num_cpus=0)
class _StatsActor:
    # ...

    def clear(self, stats_uuid: str):
        self.metadata.pop(stats_uuid, None)
        self.last_time.pop(stats_uuid, None)
        self.start_time.pop(stats_uuid, None)

class DatasetStats:
    def __del__(self):
        if self.needs_stats_actor:
            self.stats_actor.clear.remote(self.stats_uuid)

@jianoaix
Copy link
Contributor Author

jianoaix commented Aug 4, 2022

Not sure it'll work, DatasetStats as a Python/local object can have multiple instances in cluster, this cannot clean up the entry in actor for each destruction.

@clarkzinzow
Copy link
Contributor

@jianoaix Ah good point, yeah I forgot that stats can be sent around to other tasks. It doesn't seem like there's a good garbage collection point at the moment. 🤔

@@ -502,7 +504,9 @@ def _submit_task(
self, task_idx: int
) -> Tuple[ObjectRef[MaybeBlockPartition], ObjectRef[BlockPartitionMetadata]]:
"""Submit the task with index task_idx."""
stats_actor = _get_or_create_stats_actor()
if self._stats_actor is None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need the null check since it is already using get_if_exists=True

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is actually the cached actor handle in this class which was initialized to None, so it's None for the first call here.

@scv119
Copy link
Contributor

scv119 commented Aug 11, 2022

Do we need cherry pick this to 2.0.0 release branch?

@jianoaix
Copy link
Contributor Author

Do we need cherry pick this to 2.0.0 release branch?

We haven't heard any issues other than the user who originally reported this (holding a ray client, and shutdown/restarting cluster for multiple trials), so we probably do not need to pick it.

@jianoaix
Copy link
Contributor Author

Synced to head and CI passed, @clarkzinzow

@clarkzinzow clarkzinzow merged commit b1cad0a into ray-project:master Aug 12, 2022
Stefan-1313 pushed a commit to Stefan-1313/ray_mod that referenced this pull request Aug 18, 2022
The actor handle held at Ray client will become dangling if the Ray cluster is shutdown, and in such case if the user tries to get the actor again it will result in crash. This happened in a real user and blocked them from making progress.

This change makes the stats actor detached, and instead of keeping a handle, we access it via its name. This way we can make sure re-create this actor if the cluster gets restarted.

Co-authored-by: Ubuntu <ubuntu@ip-172-31-32-136.us-west-2.compute.internal>
Signed-off-by: Stefan van der Kleij <s.vanderkleij@viroteq.com>
@ericl
Copy link
Contributor

ericl commented Oct 11, 2022 via email

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
tests-ok The tagger certifies test failures are unrelated and assumes personal liability.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Core] Dangling actor handle held at Ray client
7 participants