-
Notifications
You must be signed in to change notification settings - Fork 6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Data] [2/N] Enable optimizer: fix fusion #35621
Conversation
1c5ea1b
to
4203e78
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks!
if DataContext.get_current().optimizer_enabled: | ||
if ( | ||
DataContext.get_current().optimizer_enabled | ||
# TODO(hchen): Remove this when all operators support local plan. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
# TODO(hchen): Remove this when all operators support local plan. | |
# TODO(hchen): Remove this when all operators support logical plan. |
@@ -330,18 +356,29 @@ def fused_all_to_all_transform_fn( | |||
return op | |||
|
|||
|
|||
def _are_remote_args_compatible(up_args, down_args): | |||
def _are_remote_args_compatible(prev_args, next_args): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would it be possible to add unit tests for this function?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea. Added in test_read_map_batches_operator_fusion_compatible_remote_args
and test_read_map_batches_operator_fusion_incompatible_remote_args
@@ -22,7 +22,7 @@ def get_input_data() -> List[RefBundle]: | |||
get_metadata = cached_remote_fn(get_table_block_metadata) | |||
metadata = ray.get([get_metadata.remote(t) for t in op._tables]) | |||
ref_bundles: List[RefBundle] = [ | |||
RefBundle([(table_ref, block_metadata)], owns_blocks=True) | |||
RefBundle([(table_ref, block_metadata)], owns_blocks=False) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what are all the owns_blocks
changes for?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The blocks are put into object store inside the FromArrowRefs op, so this RefBundle shouldn't own the blocks. This was a bug. This function is used for the optimizer code path only.
if map_transform_fn: | ||
upstream_map_fn = lambda block: map_transform_fn(block, ctx) # noqa: E731 | ||
# If there is a fused upstream operator, | ||
# also use the ray_remote_args from the fused upstream operator. | ||
ray_remote_args = ctx.upstream_map_ray_remote_args |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there a test for this in the later PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, test_map_batches_extra_args
if I remember correctly.
@@ -1730,7 +1730,7 @@ def test_random_shuffle_check_random(shutdown_only): | |||
prev = x | |||
|
|||
|
|||
def test_random_shuffle_with_custom_resource(ray_start_cluster): | |||
def test_random_shuffle_with_custom_resource(ray_start_cluster, use_push_based_shuffle): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
doesn't seem like use_push_based_shuffle
is actually being used by the test?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use_push_based_shuffle
is actually a fixture that will set ctx.use_push_based_shuffle
to True/False and run the test twice. The first time I saw this, I was confused as well. But we do have many such usages already.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks!
The failing tests should be fixed by merging in recent changes from the main branch |
Signed-off-by: Hao Chen <chenh1024@gmail.com>
Signed-off-by: Hao Chen <chenh1024@gmail.com>
5933363
to
1b9acf8
Compare
up_op, MapOperator | ||
if not ( | ||
( | ||
isinstance(up_op, TaskPoolMapOperator) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: should we combine the two cases into a single isinstance check on down_op?
also i recall discussing that we will potentially not support this for Actor case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Having 2 separate cases looks more clear to me. But I don't have strong preference.
We are dropping support for actor->actor case. task->actor is still supported.
Signed-off-by: Hao Chen <chenh1024@gmail.com>
@@ -27,7 +27,7 @@ def get_input_data() -> List[RefBundle]: | |||
blocks, metadata = map(list, zip(*res)) | |||
metadata = ray.get(metadata) | |||
ref_bundles: List[RefBundle] = [ | |||
RefBundle([(block, block_metadata)], owns_blocks=True) | |||
RefBundle([(block, block_metadata)], owns_blocks=False) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't this still be True since ndarray_to_block will create a copy?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right. Fixed.
@@ -48,7 +48,7 @@ def get_input_data() -> List[RefBundle]: | |||
) | |||
block_ref_bundle = RefBundle( | |||
[(ray.put(block), block_metadata)], | |||
owns_blocks=True, | |||
owns_blocks=False, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
True, since this is created by the ray.put().
@@ -237,6 +237,10 @@ class TaskContext: | |||
# an AllToAllOperator with an upstream MapOperator. | |||
upstream_map_transform_fn: Optional["MapTransformFn"] = None | |||
|
|||
# The Ray remote arguments of the fused upstream MapOperator. | |||
# This should be set if upstream_map_transform_fn is set. | |||
upstream_map_ray_remote_args: Dict[str, Any] = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, it's not ideal to pass this at runtime. Ideally, the optimizer would rewrite the downstream op's ray remote args to this value, instead of having each operator need to properly decide which of the two args to use and looking at the context.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with you. But currently it's hard to avoid this. For most operators, we are already doing the way you mentioned.
upstream_map_ray_remote_args
, along with upstream_map_transform_fn
, are used only for RandomShuffle
. Because the corresponding AllToAllOperator
physical op itself doesn't directly do the shuffle. instead, it uses ExchangeTaskScheduler
to launch new tasks to do the shuffle. That's why we need this ad-hoc handling here. I'll add a TODO here.
update: see generate_random_shuffle_fn for more details
Signed-off-by: Hao Chen <chenh1024@gmail.com>
LGTM pending tests. |
## Why are these changes needed? This PR is the 2nd part of enabling optimizer by default (split from ray-project#34937). It fixes the following issues: - `ray_remote_args` not correctly set for a fused operator. - `init_fn` not correctly set for a fused operator. - Allowed cases for fusion (see `operator_fusion.py`). - `ray_remote_args` compatibility check for fusion. - Limit operator not handled when converting logical operator to physical. - Other small fixes. Note, some changes in this PR may not be covered in this PR's CI, as the optimizer must be enabled to cover them. But they are already verified in ray-project#34937's CI). ## Related issue number ray-project#32596
## Why are these changes needed? This PR is the 2nd part of enabling optimizer by default (split from ray-project#34937). It fixes the following issues: - `ray_remote_args` not correctly set for a fused operator. - `init_fn` not correctly set for a fused operator. - Allowed cases for fusion (see `operator_fusion.py`). - `ray_remote_args` compatibility check for fusion. - Limit operator not handled when converting logical operator to physical. - Other small fixes. Note, some changes in this PR may not be covered in this PR's CI, as the optimizer must be enabled to cover them. But they are already verified in ray-project#34937's CI). ## Related issue number ray-project#32596 Signed-off-by: e428265 <arvind.chandramouli@lmco.com>
Why are these changes needed?
This PR is the 2nd part of enabling optimizer by default (split from #34937).
It fixes the following issues:
ray_remote_args
not correctly set for a fused operator.init_fn
not correctly set for a fused operator.operator_fusion.py
).ray_remote_args
compatibility check for fusion.Note, some changes in this PR may not be covered in this PR's CI, as the optimizer must be enabled to cover them. But they are already verified in #34937's CI).
Related issue number
#32596
Checks
git commit -s
) in this PR.scripts/format.sh
to lint the changes in this PR.method in Tune, I've added it in
doc/source/tune/api/
under thecorresponding
.rst
file.