Skip to content

Commit

Permalink
[PlacementGroup]Add PlacementGroup wait python api (ray-project#12601)
Browse files Browse the repository at this point in the history
  • Loading branch information
ffbin authored Dec 7, 2020
1 parent 73a1a23 commit 401d342
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 7 deletions.
28 changes: 21 additions & 7 deletions python/ray/tests/test_placement_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def value(self):
"CPU": 2
}
])
ray.get(placement_group.ready())
assert placement_group.wait(10000)
actor_1 = Actor.options(
placement_group=placement_group,
placement_group_bundle_index=0).remote()
Expand Down Expand Up @@ -96,7 +96,7 @@ def value(self):
}, {
"CPU": 2
}])
ray.get(placement_group.ready())
assert placement_group.wait(10000)
actor_1 = Actor.options(
placement_group=placement_group,
placement_group_bundle_index=0).remote()
Expand Down Expand Up @@ -142,7 +142,7 @@ def value(self):
}, {
"CPU": 2
}])
ray.get(placement_group.ready())
assert placement_group.wait(10000)
actor_1 = Actor.options(
placement_group=placement_group,
placement_group_bundle_index=0).remote()
Expand Down Expand Up @@ -192,7 +192,7 @@ def value(self):
}, {
"CPU": 2
}])
ray.get(placement_group.ready())
assert placement_group.wait(10000)
actor_1 = Actor.options(
placement_group=placement_group,
placement_group_bundle_index=0).remote()
Expand Down Expand Up @@ -1177,7 +1177,7 @@ def test_create_placement_group_after_gcs_server_restart(

# Create placement group 1 successfully.
placement_group1 = ray.util.placement_group([{"CPU": 1}, {"CPU": 1}])
ray.get(placement_group1.ready(), timeout=10)
assert placement_group1.wait(10000)
table = ray.util.placement_group_table(placement_group1)
assert table["state"] == "CREATED"

Expand All @@ -1187,7 +1187,7 @@ def test_create_placement_group_after_gcs_server_restart(

# Create placement group 2 successfully.
placement_group2 = ray.util.placement_group([{"CPU": 1}, {"CPU": 1}])
ray.get(placement_group2.ready(), timeout=10)
assert placement_group2.wait(10000)
table = ray.util.placement_group_table(placement_group2)
assert table["state"] == "CREATED"

Expand Down Expand Up @@ -1250,7 +1250,21 @@ def test_create_placement_group_during_gcs_server_restart(
cluster.head_node.start_gcs_server()

for i in range(0, 100):
ray.get(placement_groups[i].ready())
assert placement_groups[i].wait(10000)


def test_placement_group_wait_api(ray_start_cluster):
cluster = ray_start_cluster
cluster.add_node(num_cpus=4)
ray.init(address=cluster.address)

placement_group = ray.util.placement_group(
name="name", strategy="PACK", bundles=[{
"CPU": 2,
}, {
"CPU": 2
}])
assert placement_group.wait(10000)


if __name__ == "__main__":
Expand Down
15 changes: 15 additions & 0 deletions python/ray/util/placement_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,21 @@ def ready(self) -> ObjectRef:
placement_group_bundle_index=bundle_index,
resources=resources).remote(self)

def wait(self, timeout_ms: int) -> bool:
"""Wait for the placement group to be ready within the specified time.
Args:
timeout_ms(str): Timeout in milliseconds.
Return:
True if the placement group is created. False otherwise.
"""
worker = ray.worker.global_worker
worker.check_connected()

return worker.core_worker.wait_placement_group_ready(
self.id, timeout_ms)

@property
def bundle_specs(self) -> List[Dict]:
"""List[Dict]: Return bundles belonging to this placement group."""
Expand Down

0 comments on commit 401d342

Please sign in to comment.