Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Serve][Core] Fix serve_long_running memory leak by fixing GCS pubsub asyncio task leak #29187

Merged
merged 11 commits into from
Oct 11, 2022
13 changes: 8 additions & 5 deletions python/ray/_private/gcs_pubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -505,13 +505,16 @@ async def _poll_call(self, req, timeout=None):
async def _poll(self, timeout=None) -> None:
req = self._poll_request()
while len(self._queue) == 0:
# TODO: use asyncio.create_task() after Python 3.6 is no longer
# supported.
poll = asyncio.ensure_future(self._poll_call(req, timeout=timeout))
close = asyncio.ensure_future(self._close.wait())
done, _ = await asyncio.wait(
poll = asyncio.get_event_loop().create_task(
self._poll_call(req, timeout=timeout)
)
close = asyncio.get_event_loop().create_task(self._close.wait())
done, not_done = await asyncio.wait(
architkulkarni marked this conversation as resolved.
Show resolved Hide resolved
[poll, close], timeout=timeout, return_when=asyncio.FIRST_COMPLETED
)
not_done_task = not_done.pop()
if not not_done_task.done():
not_done_task.cancel()
architkulkarni marked this conversation as resolved.
Show resolved Hide resolved
if poll not in done or close in done:
# Request timed out or subscriber closed.
break
Expand Down
30 changes: 30 additions & 0 deletions release/long_running_tests/tpl_cpu_1_c5.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
cloud_id: {{env["ANYSCALE_CLOUD_ID"]}}
region: us-west-2

max_workers: 0

head_node_type:
name: head_node
instance_type: c5.2xlarge

worker_node_types:
- name: worker_node
instance_type: c5.2xlarge
min_workers: 0
max_workers: 0
use_spot: false

aws:
TagSpecifications:
- ResourceType: "instance"
Tags:
- Key: anyscale-user
Value: '{{env["ANYSCALE_USER"]}}'
- Key: anyscale-expiration
Value: '{{env["EXPIRATION_2D"]}}'

BlockDeviceMappings:
- DeviceName: /dev/sda1
Ebs:
VolumeSize: 300
DeleteOnTermination: true
13 changes: 11 additions & 2 deletions release/long_running_tests/workloads/serve.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ def update_progress(result):
cluster.add_node(
redis_port=6379 if i == 0 else None,
num_redis_shards=NUM_REDIS_SHARDS if i == 0 else None,
dashboard_agent_listen_port=(52365 + i),
num_cpus=8,
num_gpus=0,
resources={str(i): 2},
Expand All @@ -56,7 +57,7 @@ def update_progress(result):
dashboard_host="0.0.0.0",
)

ray.init(address=cluster.address, dashboard_host="0.0.0.0")
ray.init(address=cluster.address, log_to_driver=False, dashboard_host="0.0.0.0")
serve.start()


Expand Down Expand Up @@ -101,7 +102,15 @@ async def __call__(self, request):
)
proc.wait()
out, err = proc.communicate()

# Check if wrk command succeeded. If this happens repeatedly, the release test
# infrastructure will correctly fail the test with "Last update to results json
# was too long ago."
if proc.returncode != 0:
print("wrk failed with the following error: ")
print(err)
print("Will try again in 5 seconds")
time.sleep(5)
continue
# Sample wrk stdout:
#
# Running 10s test @ http://127.0.0.1:8000/echo
Expand Down
2 changes: 1 addition & 1 deletion release/release_tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2090,7 +2090,7 @@
team: serve
cluster:
cluster_env: app_config.yaml
cluster_compute: tpl_cpu_1.yaml
cluster_compute: tpl_cpu_1_c5.yaml

run:
timeout: 86400
Expand Down