Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[data] New executor [7/n]--- bare bones implementation of StreamingExecutor #31579

Merged
merged 24 commits into from
Jan 17, 2023
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fix finalization
ericl committed Jan 12, 2023
commit 3cf4a6ee84c827136cb6e9c0ac690fe61145f635
7 changes: 3 additions & 4 deletions demo.py
Original file line number Diff line number Diff line change
@@ -6,15 +6,14 @@
def sleep(x):
import time

time.sleep(1)
time.sleep(0.1)
return x


for x in (
ray.data.range(50, parallelism=20)
ray.data.range(50, parallelism=2)
.map(sleep, num_cpus=0.3)
.map(sleep, num_cpus=0.4)
.map(sleep, num_cpus=0.5)
.random_shuffle()
.iter_rows()
):
print("OUTPUT", x)
9 changes: 4 additions & 5 deletions python/ray/data/_internal/execution/bulk_executor.py
Original file line number Diff line number Diff line change
@@ -101,9 +101,8 @@ def _naive_run_until_complete(op: PhysicalOperator) -> List[RefBundle]:
bar.update(1)
output.append(op.get_next())
bar.close()
# An operator is finished only after it has no remaining work as well as no
# remaining outputs.
while op.has_next():
output.append(op.get_next())
assert not op.get_work_refs(), "Should not have any remaining work"
else:
while op.has_next():
output.append(op.get_next())
assert op.completed(), "Should have finished execution of the op."
return output
4 changes: 4 additions & 0 deletions python/ray/data/_internal/execution/interfaces.py
Original file line number Diff line number Diff line change
@@ -146,6 +146,10 @@ def input_dependencies(self) -> List["PhysicalOperator"]:
), "PhysicalOperator.__init__() was not called."
return self._input_dependencies

def completed(self) -> bool:
"""Return True when this operator is done and all outputs are taken."""
raise NotImplementedError

def get_stats(self) -> StatsDict:
"""Return recorded execution stats for use with DatasetStats."""
raise NotImplementedError
Original file line number Diff line number Diff line change
@@ -34,20 +34,26 @@ def __init__(
self._num_outputs = num_outputs
self._input_buffer: List[RefBundle] = []
self._output_buffer: List[RefBundle] = []
self._completed = False
self._stats: StatsDict = {}
super().__init__(name, [input_op])

def num_outputs_total(self) -> Optional[int]:
return self._num_outputs

def add_input(self, refs: RefBundle, input_index: int) -> None:
assert not self.completed()
assert input_index == 0, input_index
self._input_buffer.append(refs)

def inputs_done(self, input_index: int) -> None:
assert input_index == 0, input_index
self._output_buffer, self._stats = self._bulk_fn(self._input_buffer)
self._input_buffer.clear()
self._completed = True

def completed(self) -> bool:
return self._completed
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't we need to check whether self.has_next() as we do in the map operator?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done (moved to physical op class).


def has_next(self) -> bool:
return len(self._output_buffer) > 0
Original file line number Diff line number Diff line change
@@ -36,6 +36,9 @@ def has_next(self) -> bool:
def get_next(self) -> RefBundle:
return self._input_data.pop(0)

def completed(self) -> bool:
return not self.has_next()

def num_outputs_total(self) -> Optional[int]:
return self._num_outputs

Original file line number Diff line number Diff line change
@@ -63,6 +63,7 @@ def __init__(
else:
raise ValueError(f"Unsupported execution strategy {compute_strategy}")
self._output_metadata: List[BlockMetadata] = []
self._inputs_done = False
super().__init__(name, [input_op])

def get_metrics(self) -> Dict[str, int]:
@@ -78,6 +79,7 @@ def add_input(self, refs: RefBundle, input_index: int) -> None:

def inputs_done(self, input_index: int) -> None:
self._execution_state.inputs_done(input_index)
self._inputs_done = True
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there an assumption here that there is only one input (add a note?)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.


def has_next(self) -> bool:
return self._execution_state.has_next()
@@ -94,6 +96,11 @@ def get_work_refs(self) -> List[ray.ObjectRef]:
def notify_work_completed(self, task: ray.ObjectRef) -> None:
self._execution_state.work_completed(task)

def completed(self) -> bool:
return (
self._inputs_done and len(self.get_work_refs()) == 0 and not self.has_next()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like this definition could be shared across the different operators?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

)

def get_stats(self) -> StatsDict:
return {self._name: self._output_metadata}

20 changes: 16 additions & 4 deletions python/ray/data/_internal/execution/streaming_executor_state.py
Original file line number Diff line number Diff line change
@@ -3,7 +3,7 @@
This is split out from streaming_executor.py to facilitate better unit testing.
"""

from typing import Dict, List, Optional
from typing import Dict, List, Set, Optional

import ray
from ray.data._internal.execution.interfaces import (
@@ -34,10 +34,11 @@ def __init__(self, op: PhysicalOperator):
self.op = op
self.progress_bar = None
self.num_completed_tasks = 0
self.finalized_input_indices: Set[int] = set()

def initialize_progress_bar(self, index: int) -> None:
self.progress_bar = ProgressBar(
self.op.name, self.op.num_outputs_total(), index
self.op.name, self.op.num_outputs_total() or 1, index
)

def num_queued(self) -> int:
@@ -107,7 +108,7 @@ def process_completed_tasks(topology: Topology) -> bool:
"""Process any newly completed tasks and update operator state.

Returns:
Whether there remain incomplete tasks in the topology.
Whether there remain incomplete work (Ray futures) in the topology.
"""
for op_state in topology.values():
op_state.refresh_progress_bar()
@@ -135,7 +136,18 @@ def process_completed_tasks(topology: Topology) -> bool:
while op.has_next():
op_state.add_output(op.get_next())

return len(active_tasks)
# Call inputs_done() on ops where no more inputs are coming.
for op, op_state in topology.items():
for i, dep in enumerate(op.input_dependencies):
if (
dep.completed()
and not topology[dep].outqueue
and i not in op_state.finalized_input_indices
):
op.inputs_done(input_index=i)
op_state.finalized_input_indices.add(i)

return len(active_tasks) > 0


def select_operator_to_run(topology: Topology) -> Optional[PhysicalOperator]:
5 changes: 4 additions & 1 deletion python/ray/data/_internal/plan.py
Original file line number Diff line number Diff line change
@@ -357,7 +357,10 @@ def execute_to_iterator(
# Since the generator doesn't run any code until we try to fetch the first
# value, force execution of one bundle before we call get_stats().
gen = iter(block_iter)
block_iter = itertools.chain([next(gen)], gen)
try:
block_iter = itertools.chain([next(gen)], gen)
except StopIteration:
pass
return block_iter, executor.get_stats()

def execute(
158 changes: 158 additions & 0 deletions python/ray/data/tests/test_streaming_executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
import pytest
import time
from unittest.mock import MagicMock

from typing import List, Any

import ray
from ray.data.context import DatasetContext
from ray.data._internal.execution.interfaces import ExecutionOptions, RefBundle
from ray.data._internal.execution.streaming_executor import StreamingExecutor
from ray.data._internal.execution.streaming_executor_state import (
OpState,
build_streaming_topology,
process_completed_tasks,
select_operator_to_run,
dispatch_next_task,
)
from ray.data._internal.execution.operators.all_to_all_operator import AllToAllOperator
from ray.data._internal.execution.operators.map_operator import MapOperator
from ray.data._internal.execution.operators.input_data_buffer import InputDataBuffer
from ray.data._internal.execution.util import make_ref_bundles


@ray.remote
def sleep():
time.sleep(999)


def make_transform(block_fn):
def map_fn(block_iter):
for block in block_iter:
yield block_fn(block)

return map_fn


def ref_bundles_to_list(bundles: List[RefBundle]) -> List[List[Any]]:
output = []
for bundle in bundles:
for block, _ in bundle.blocks:
output.append(ray.get(block))
return output


def test_build_streaming_topology():
inputs = make_ref_bundles([[x] for x in range(20)])
o1 = InputDataBuffer(inputs)
o2 = MapOperator(make_transform(lambda block: [b * -1 for b in block]), o1)
o3 = MapOperator(make_transform(lambda block: [b * 2 for b in block]), o2)
topo, _ = build_streaming_topology(o3)
assert len(topo) == 3, topo
assert o1 in topo, topo
assert not topo[o1].inqueues, topo
assert topo[o1].outqueue == topo[o2].inqueues[0], topo
assert topo[o2].outqueue == topo[o3].inqueues[0], topo


def test_process_completed_tasks():
inputs = make_ref_bundles([[x] for x in range(20)])
o1 = InputDataBuffer(inputs)
o2 = MapOperator(make_transform(lambda block: [b * -1 for b in block]), o1)
topo, _ = build_streaming_topology(o2)

# Test processing output bundles.
assert len(topo[o1].outqueue) == 0, topo
assert process_completed_tasks(topo) is False
assert len(topo[o1].outqueue) == 20, topo

# Test processing completed work items.
sleep_ref = sleep.remote()
done_ref = ray.put("done")
o2.get_work_refs = MagicMock(return_value=[sleep_ref, done_ref])
o2.notify_work_completed = MagicMock()
o2.inputs_done = MagicMock()
assert process_completed_tasks(topo) is True
o2.notify_work_completed.assert_called_once_with(done_ref)
o2.inputs_done.assert_not_called()

# Test input finalization.
o2.get_work_refs = MagicMock(return_value=[done_ref])
o2.notify_work_completed = MagicMock()
o2.inputs_done = MagicMock()
o1.completed = MagicMock(return_value=True)
topo[o1].outqueue.clear()
assert process_completed_tasks(topo) is False
o2.notify_work_completed.assert_called_once_with(done_ref)
o2.inputs_done.assert_called_once_with(input_index=0)


def test_select_operator_to_run():
inputs = make_ref_bundles([[x] for x in range(20)])
o1 = InputDataBuffer(inputs)
o2 = MapOperator(make_transform(lambda block: [b * -1 for b in block]), o1)
o3 = MapOperator(make_transform(lambda block: [b * 2 for b in block]), o2)
topo, _ = build_streaming_topology(o3)

# Test empty.
assert select_operator_to_run(topo) == None

# Test backpressure based on queue length between operators.
topo[o1].outqueue.append("dummy1")
assert select_operator_to_run(topo) == o2
topo[o1].outqueue.append("dummy2")
assert select_operator_to_run(topo) == o2
topo[o2].outqueue.append("dummy3")
assert select_operator_to_run(topo) == o3

# Test backpressure includes num active tasks as well.
topo[o3].num_active_tasks = MagicMock(return_value=2)
assert select_operator_to_run(topo) == o2
topo[o2].num_active_tasks = MagicMock(return_value=2)
assert select_operator_to_run(topo) == o3


def test_dispatch_next_task():
inputs = make_ref_bundles([[x] for x in range(20)])
o1 = InputDataBuffer(inputs)
o2 = MapOperator(make_transform(lambda block: [b * -1 for b in block]), o1)
op_state = OpState(o2)
o2.add_input = MagicMock()
op_state.inqueues[0].append("dummy1")
dispatch_next_task(op_state)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest running this multiple times so we can test indices other than 0.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a test for multiple inputs, and a TODO for multiple indices.

assert o2.add_input.called_once_with("dummy1")


def test_pipelined_execution():
executor = StreamingExecutor(ExecutionOptions())
inputs = make_ref_bundles([[x] for x in range(20)])
o1 = InputDataBuffer(inputs)
o2 = MapOperator(make_transform(lambda block: [b * -1 for b in block]), o1)
o3 = MapOperator(make_transform(lambda block: [b * 2 for b in block]), o2)

def reverse_sort(inputs: List[RefBundle]):
reversed_list = inputs[::-1]
return reversed_list, {}

o4 = AllToAllOperator(reverse_sort, o3)
it = executor.execute(o4)
output = ref_bundles_to_list(it)
expected = [[x * -2] for x in range(20)][::-1]
assert output == expected, (output, expected)


# TODO(ekl) remove this test once we have the new backend on by default.
def test_e2e_streaming_sanity():
raise NotImplementedError
DatasetContext.get_current().new_execution_backend = True
result = ray.data.range(5).map(lambda x: x + 1)
assert result.take_all() == [1, 2, 3, 4, 5], result

# Checks new executor was enabled.
assert "obj_store_mem_alloc" in result.stats(), result.stats()


if __name__ == "__main__":
import sys

sys.exit(pytest.main(["-v", __file__]))