Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[data] New executor [7/n]--- bare bones implementation of StreamingExecutor #31579

Merged
merged 24 commits into from
Jan 17, 2023
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
add sanity test
ericl committed Jan 12, 2023
commit b4147b52ba4a741e1fce7048abff90cf5836dade
29 changes: 22 additions & 7 deletions python/ray/data/tests/test_streaming_executor.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import pytest
import asyncio
import time
from unittest.mock import MagicMock

@@ -95,7 +96,7 @@ def test_select_operator_to_run():
topo, _ = build_streaming_topology(o3)

# Test empty.
assert select_operator_to_run(topo) == None
assert select_operator_to_run(topo) is None

# Test backpressure based on queue length between operators.
topo[o1].outqueue.append("dummy1")
@@ -143,13 +144,27 @@ def reverse_sort(inputs: List[RefBundle]):

# TODO(ekl) remove this test once we have the new backend on by default.
def test_e2e_streaming_sanity():
raise NotImplementedError
DatasetContext.get_current().new_execution_backend = True
result = ray.data.range(5).map(lambda x: x + 1)
assert result.take_all() == [1, 2, 3, 4, 5], result

# Checks new executor was enabled.
assert "obj_store_mem_alloc" in result.stats(), result.stats()
DatasetContext.get_current().use_streaming_executor = True

@ray.remote
class Barrier:
async def admit(self, x):
if x == 4:
print("Not allowing 4 to pass")
await asyncio.sleep(999)
else:
print(f"Allowing {x} to pass")

barrier = Barrier.remote()

def f(x):
ray.get(barrier.admit.remote(x))
return x + 1

# Check we can take the first items even if the last one gets stuck.
result = ray.data.range(5, parallelism=5).map(f)
assert result.take(4) == [1, 2, 3, 4]


if __name__ == "__main__":