Skip to content

Commit

Permalink
feat: Sequentially materialize left and right sides during hash join (#…
Browse files Browse the repository at this point in the history
…3735)

Run left / right child in a hash join sequentially instead of in
lock-step. This prevents shuffles from executing in parallel, reducing
risk of spillage.

Additionally, emit finalized hash join steps as they are ready, and then
return outputs of the hash join in partition order.

### Results (TPCH SF 1000, 4 x i8g.4xlarge):

#### After:
Daft Q1 took 29.05 seconds
Daft Q2 took 28.42 seconds
Daft Q3 took 42.57 seconds
Daft Q4 took 19.79 seconds
Daft Q5 took 141.80 seconds
Daft Q6 took 11.00 seconds
Daft Q7 took 66.04 seconds
Daft Q8 took 128.28 seconds
Daft Q9 took 254.26 seconds
Daft Q10 took 43.72 seconds

Total time: 12m 45s
Spilled 1882432 MiB

#### Before:

Q1 took 31.05 seconds
Q2 took 24.95 seconds
Q3 took 50.91 seconds
Q4 took 24.11 seconds
Q5 took 177.07 seconds
Q6 took 11.17 seconds
Q7 took 75.97 seconds
Q8 took 150.76 seconds
Q9 took 263.51 seconds
Q10 took 59.37 seconds

Total time: 14m 29s
Spilled 2200948 MiB

**318,516 MB spillage difference, 14.4% decrease
1 minute and 44 seconds difference, 12% decrease**

---------

Co-authored-by: Colin Ho <colinho@Colins-MBP.localdomain>
Co-authored-by: Colin Ho <colinho@Colins-MacBook-Pro.local>
  • Loading branch information
3 people authored Feb 13, 2025
1 parent 00ef3bf commit 246e3e9
Showing 1 changed file with 98 additions and 61 deletions.
159 changes: 98 additions & 61 deletions daft/execution/physical_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -344,40 +344,44 @@ def hash_join(
how: JoinType,
) -> InProgressPhysicalPlan[PartitionT]:
"""Hash-based pairwise join the partitions from `left_child_plan` and `right_child_plan` together."""
# Materialize the steps from the left and right sources to get partitions.
# As the materializations complete, emit new steps to join each left and right partition.
left_requests: deque[SingleOutputPartitionTask[PartitionT]] = deque()
right_requests: deque[SingleOutputPartitionTask[PartitionT]] = deque()
stage_id = next(stage_id_counter)
yield_left = True
left_tasks: dict[int, SingleOutputPartitionTask[PartitionT]] = {}
right_tasks: dict[int, SingleOutputPartitionTask[PartitionT]] = {}

while True:
# Emit new join steps if we have left and right partitions ready.
while (
len(left_requests) > 0 and len(right_requests) > 0 and left_requests[0].done() and right_requests[0].done()
):
next_left = left_requests.popleft()
next_right = right_requests.popleft()
left_stage_id = next(stage_id_counter)
right_stage_id = next(stage_id_counter)
hash_join_stage_id = next(stage_id_counter)

# Calculate memory request for task.
left_size_bytes = next_left.partition_metadata().size_bytes
right_size_bytes = next_right.partition_metadata().size_bytes
if left_size_bytes is None and right_size_bytes is None:
size_bytes = None
elif left_size_bytes is None and right_size_bytes is not None:
# Use 2x the right side as the memory request, assuming that left and right side are ~ the same size.
size_bytes = 2 * right_size_bytes
elif right_size_bytes is None and left_size_bytes is not None:
# Use 2x the left side as the memory request, assuming that left and right side are ~ the same size.
size_bytes = 2 * left_size_bytes
elif left_size_bytes is not None and right_size_bytes is not None:
size_bytes = left_size_bytes + right_size_bytes

join_step = PartitionTaskBuilder[PartitionT](
inputs=[next_left.partition(), next_right.partition()],
partial_metadatas=[next_left.partition_metadata(), next_right.partition_metadata()],
# First, fully materialize the left side of the join
for step in left_plan:
if isinstance(step, PartitionTaskBuilder):
step = step.finalize_partition_task_single_output(stage_id=left_stage_id)
left_tasks[len(left_tasks)] = step
yield step

def create_join_step(
left_task: SingleOutputPartitionTask[PartitionT], right_task: SingleOutputPartitionTask[PartitionT]
) -> SingleOutputPartitionTask[PartitionT]:
"""Helper function to create a join step for a pair of tasks."""
left_size_bytes = left_task.partition_metadata().size_bytes
right_size_bytes = right_task.partition_metadata().size_bytes

# Calculate memory request for task
if left_size_bytes is None and right_size_bytes is None:
size_bytes = None
elif left_size_bytes is None and right_size_bytes is not None:
size_bytes = 2 * right_size_bytes # Assume left ≈ right size
elif right_size_bytes is None and left_size_bytes is not None:
size_bytes = 2 * left_size_bytes # Assume right ≈ left size
elif left_size_bytes is not None and right_size_bytes is not None:
size_bytes = left_size_bytes + right_size_bytes

join_step = (
PartitionTaskBuilder[PartitionT](
inputs=[left_task.partition(), right_task.partition()],
partial_metadatas=[left_task.partition_metadata(), right_task.partition_metadata()],
resource_request=ResourceRequest(memory_bytes=size_bytes),
).add_instruction(
)
.add_instruction(
instruction=execution_step.HashJoin(
left_on=left_on,
right_on=right_on,
Expand All @@ -386,40 +390,73 @@ def hash_join(
is_swapped=False,
)
)
yield join_step
.finalize_partition_task_single_output(stage_id=hash_join_stage_id)
)
return join_step

# Exhausted all ready inputs; execute a single child step to get more join inputs.
# Choose whether to execute from left child or right child (whichever one is more behind)
if len(left_requests) < len(right_requests):
next_plan, next_requests = left_plan, left_requests
elif len(left_requests) > len(right_requests):
next_plan, next_requests = right_plan, right_requests
elif len(left_requests) == len(right_requests):
# Both plans have progressed equally; alternate between the two plans to avoid starving either one
next_plan, next_requests = (left_plan, left_requests) if yield_left else (right_plan, right_requests)
yield_left = not yield_left
join_tasks: dict[int, SingleOutputPartitionTask[PartitionT]] = {}
right_partition_counter = 0
next_join_partition_to_emit = 0
while True:
# Check if we have any join tasks that are ready to be emitted
while next_join_partition_to_emit in join_tasks and join_tasks[next_join_partition_to_emit].done():
to_emit = join_tasks.pop(next_join_partition_to_emit)
size_bytes = to_emit.partition_metadata().size_bytes
yield PartitionTaskBuilder[PartitionT](
inputs=[to_emit.partition()],
partial_metadatas=[to_emit.partition_metadata()],
resource_request=ResourceRequest(memory_bytes=size_bytes),
)
next_join_partition_to_emit += 1

try:
step = next(next_plan)
if isinstance(step, PartitionTaskBuilder):
step = step.finalize_partition_task_single_output(stage_id=stage_id)
next_requests.append(step)
yield step
# Find all partitions that are ready to be joined
ready_partitions = [
partition_num
for partition_num in left_tasks.keys() & right_tasks.keys() # Intersection of keys
if left_tasks[partition_num].done() and right_tasks[partition_num].done()
]

except StopIteration:
# Left and right child plans have completed.
# Are we still waiting for materializations to complete? (We will emit more joins from them).
if len(left_requests) + len(right_requests) > 0:
logger.debug(
"join blocked on completion of sources.\n Left sources: %s\nRight sources: %s",
left_requests,
right_requests,
)
yield None
if len(ready_partitions) > 0:
# Process all ready pairs
for partition in ready_partitions:
left_task = left_tasks.pop(partition)
right_task = right_tasks.pop(partition)
join_task = create_join_step(left_task, right_task)
join_tasks[partition] = join_task
yield join_task
else:
try:
# Process next right plan step
step = next(right_plan)
if isinstance(step, PartitionTaskBuilder):
step = step.finalize_partition_task_single_output(stage_id=right_stage_id)
right_tasks[right_partition_counter] = step
right_partition_counter += 1
yield step

# Otherwise, we are entirely done.
else:
return
except StopIteration:
if left_tasks or right_tasks:
logger.debug(
"join blocked on completion of sources.\n Left sources: %s\nRight sources: %s",
left_tasks,
right_tasks,
)
yield None
else:
break

# Emit the remaining join tasks in order of partition number
while len(join_tasks) > 0:
while not join_tasks[next_join_partition_to_emit].done():
yield None
to_emit = join_tasks.pop(next_join_partition_to_emit)
size_bytes = to_emit.partition_metadata().size_bytes
yield PartitionTaskBuilder[PartitionT](
inputs=[to_emit.partition()],
partial_metadatas=[to_emit.partition_metadata()],
resource_request=ResourceRequest(memory_bytes=size_bytes),
)
next_join_partition_to_emit += 1


def _create_broadcast_join_step(
Expand Down

0 comments on commit 246e3e9

Please sign in to comment.