Skip to content

Commit

Permalink
perf: Add a ray wait on metadatas when awaiting for tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
Jay Chia committed Dec 12, 2024
1 parent 6ae4e77 commit 6adae15
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 0 deletions.
14 changes: 14 additions & 0 deletions daft/execution/execution_step.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,14 @@ def set_result(self, result: list[MaterializedResult[PartitionT]]) -> None:
"""
raise NotImplementedError

def get_results(self) -> list[MaterializedResult[PartitionT]]:
"""Gets the results of this Task.
NOTE: A PartitionTask may contain a `result` without being `.done()`. This is because
results can potentially contain futures which are yet to be completed.
"""
raise NotImplementedError

Check warning on line 92 in daft/execution/execution_step.py

View check run for this annotation

Codecov / codecov/patch

daft/execution/execution_step.py#L92

Added line #L92 was not covered by tests

def is_empty(self) -> bool:
"""Whether this partition task is guaranteed to result in an empty partition."""
return len(self.partial_metadatas) > 0 and all(meta.num_rows == 0 for meta in self.partial_metadatas)
Expand Down Expand Up @@ -207,6 +215,9 @@ def set_result(self, result: list[MaterializedResult[PartitionT]]) -> None:
[partition] = result
self._result = partition

def get_results(self) -> list[MaterializedResult[PartitionT]]:
return [self._result]

def result(self) -> MaterializedResult[PartitionT]:
assert self._result is not None, "Cannot call .result() on a PartitionTask that is not done"
return self._result
Expand Down Expand Up @@ -254,6 +265,9 @@ def set_result(self, result: list[MaterializedResult[PartitionT]]) -> None:
assert self._results is None, f"Cannot set result twice. Result is already {self._results}"
self._results = result

def get_results(self) -> list[MaterializedResult[PartitionT]]:
return self._results

def cancel(self) -> None:
if self._results is not None:
for result in self._results:
Expand Down
16 changes: 16 additions & 0 deletions daft/runners/ray_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -808,6 +808,16 @@ def _await_tasks(
task_id = inflight_ref_to_task_id[ready]
runner_tracer.task_received_as_ready(task_id, inflight_tasks[task_id].stage_id)

# Run a .wait on the metadatas to retrieve them locally so that subsequent accesses will be faster
ready_metadatas = list(
{
result.get_metadata_objref()
for ready in readies
for result in inflight_tasks[inflight_ref_to_task_id[ready]].get_results()
}
)
ray.wait(ready_metadatas, fetch_local=True, num_returns=len(ready_metadatas), timeout=5)

return readies

def _is_active(self, execution_id: str):
Expand Down Expand Up @@ -1364,6 +1374,9 @@ def metadata(self) -> PartitionMetadata:
def cancel(self) -> None:
return ray.cancel(self._partition)

def get_metadata_objref(self) -> ray.ObjectRef:
return self._metadatas.get_objref()

def _noop(self, _: ray.ObjectRef) -> None:
return None

Expand All @@ -1383,6 +1396,9 @@ def _get_metadatas(self) -> list[PartitionMetadata]:
def get_index(self, key) -> PartitionMetadata:
return self._get_metadatas()[key]

def get_objref(self) -> ray.ObjectRef:
return self._ref

@classmethod
def from_metadata_list(cls, meta: list[PartitionMetadata]) -> PartitionMetadataAccessor:
ref = ray.put(meta)
Expand Down

0 comments on commit 6adae15

Please sign in to comment.