Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ci][core] Add more visbility into state api stress test #36465

Merged
merged 5 commits into from
Jun 16, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 28 additions & 14 deletions release/nightly_tests/stress_tests/test_state_api_scale.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@
list_tasks,
)

import logging

logging.basicConfig(level=logging.INFO, format="%(asctime)s %(message)s")
logger = logging.getLogger(__file__)

GiB = 1024 * 1024 * 1024
MiB = 1024 * 1024

Expand All @@ -46,7 +51,7 @@ def verify():
def test_many_tasks(num_tasks: int):
TASK_NAME_TEMPLATE = "pi4_sample_{num_tasks}"
if num_tasks == 0:
print("Skipping test with no tasks")
logger.info("Skipping test with no tasks")
return

# No running tasks
Expand Down Expand Up @@ -113,7 +118,7 @@ def pi4_sample():

def test_many_actors(num_actors: int):
if num_actors == 0:
print("Skipping test with no actors")
logger.info("Skipping test with no actors")
return

@ray.remote
Expand All @@ -139,7 +144,7 @@ def exit(self):
]

waiting_actors = [actor.running.remote() for actor in actors]
print("Waiting for actors to finish...")
logger.info("Waiting for actors to finish...")
ray.get(waiting_actors)

invoke_state_api_n(
Expand All @@ -165,7 +170,7 @@ def exit(self):

def test_many_objects(num_objects, num_actors):
if num_objects == 0:
print("Skipping test with no objects")
logger.info("Skipping test with no objects")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so we have timestamps.

return

pg = placement_group([{"CPU": 1}] * num_actors, strategy="SPREAD")
Expand All @@ -180,12 +185,17 @@ def __init__(self):
def create_objs(self, num_objects):
import os

for _ in range(num_objects):
for i in range(num_objects):
# Object size shouldn't matter here.
self.objs.append(ray.put(bytearray(os.urandom(1024))))
if i + 1 % 100 == 0:
logger.info(f"Created object {i+1}...")

return self.objs

def ready(self):
pass

actors = [
ObjectActor.options(
scheduling_strategy=PlacementGroupSchedulingStrategy(
Expand All @@ -195,6 +205,10 @@ def create_objs(self, num_objects):
for _ in tqdm.trange(num_actors, desc="Creating actors...")
]

waiting_actors = [actor.ready.remote() for actor in actors]
for _ in tqdm.trange(len(actors), desc="Waiting actors to be ready..."):
_ready, waiting_actors = ray.wait(waiting_actors)

# Splitting objects to multiple actors for creation,
# credit: https://stackoverflow.com/a/2135920
def _split(a, n):
Expand Down Expand Up @@ -234,7 +248,7 @@ def _split(a, n):

def test_large_log_file(log_file_size_byte: int):
if log_file_size_byte == 0:
print("Skipping test with 0 log file size")
logger.info("Skipping test with 0 log file size")
return

import sys
Expand Down Expand Up @@ -376,27 +390,27 @@ def test(
start_time = time.perf_counter()
# Run some long-running tasks
for n in num_tasks_arr:
print(f"\nRunning with many tasks={n}")
logger.info(f"Running with many tasks={n}")
test_many_tasks(num_tasks=n)
print(f"\ntest_many_tasks({n}) PASS")
logger.info(f"test_many_tasks({n}) PASS")

# Run many actors
for n in num_actors_arr:
print(f"\nRunning with many actors={n}")
logger.info(f"Running with many actors={n}")
test_many_actors(num_actors=n)
print(f"\ntest_many_actors({n}) PASS")
logger.info(f"test_many_actors({n}) PASS")

# Create many objects
for n in num_objects_arr:
print(f"\nRunning with many objects={n}")
logger.info(f"Running with many objects={n}")
test_many_objects(num_objects=n, num_actors=num_actors_for_objects)
print(f"\ntest_many_objects({n}) PASS")
logger.info(f"test_many_objects({n}) PASS")

# Create large logs
for n in log_file_size_arr:
print(f"\nRunning with large file={n} bytes")
logger.info(f"Running with large file={n} bytes")
test_large_log_file(log_file_size_byte=n)
print(f"\ntest_large_log_file({n} bytes) PASS")
logger.info(f"test_large_log_file({n} bytes) PASS")

print("\n\nPASS")
end_time = time.perf_counter()
Expand Down
3 changes: 3 additions & 0 deletions release/ray_release/scripts/run_release_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,9 @@ def main(
no_terminate=no_terminate,
)
return_code = result.return_code
from rich import print

print(result)
rickyyx marked this conversation as resolved.
Show resolved Hide resolved
except ReleaseTestError as e:
logger.exception(e)
return_code = e.exit_code.value
Expand Down
5 changes: 4 additions & 1 deletion release/release_tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4791,6 +4791,9 @@
group: core-daily-test
working_dir: nightly_tests

stable: false
rickyyx marked this conversation as resolved.
Show resolved Hide resolved
jailed: true

python: "3.8"
frequency: nightly
team: core
Expand All @@ -4803,7 +4806,7 @@
cluster_compute: stress_tests/stress_tests_compute_large.yaml

run:
timeout: 3600
timeout: 4200
script: python stress_tests/test_state_api_scale.py
wait_for_nodes:
num_nodes: 7
Expand Down