Skip to content

Commit

Permalink
[data] Log a warning if the batch size is misconfigured in a way that…
Browse files Browse the repository at this point in the history
… would grossly reduce parallelism for actor pool. (ray-project#34594)
  • Loading branch information
ericl authored and architkulkarni committed May 16, 2023
1 parent 7fd44e6 commit 53a95ab
Showing 1 changed file with 27 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ def __init__(
)
self._init_fn = init_fn
self._ray_remote_args = self._apply_default_remote_args(self._ray_remote_args)
self._min_rows_per_bundle = min_rows_per_bundle

# Create autoscaling policy from compute strategy.
self._autoscaling_policy = autoscaling_policy
Expand Down Expand Up @@ -249,6 +250,32 @@ def shutdown(self):
self._actor_pool.kill_all_actors()
super().shutdown()

# Warn if the user specified a batch or block size that prevents full
# parallelization across the actor pool. We only know this information after
# execution has completed.
total_rows = sum([m.num_rows for m in self._output_metadata])
min_workers = self._autoscaling_policy.min_workers
max_desired_batch_size = total_rows // min_workers
if (
self._min_rows_per_bundle is not None
and self._min_rows_per_bundle > max_desired_batch_size
):
# The user specified a batch size, but it was probably too large.
logger.get_logger().warning(
"To ensure full parallelization across an actor pool of size "
f"{min_workers}, the specified batch size "
f"should be at most {max_desired_batch_size}. Your configured batch "
f"size for this operator was {self._min_rows_per_bundle}."
)
elif len(self._output_metadata) < min_workers:
# The user created a stream that has too few blocks to begin with.
logger.get_logger().warning(
"To ensure full parallelization across an actor pool of size "
f"{min_workers}, the Datastream should consist of at least "
f"{min_workers} distinct blocks. Consider increasing "
"the parallelism when creating the Datastream."
)

def get_work_refs(self) -> List[ray.ObjectRef]:
# Work references that we wish the executor to wait on includes both task
# futures AND worker ready futures.
Expand Down

0 comments on commit 53a95ab

Please sign in to comment.