Skip to content

Commit

Permalink
repair bugs for shard blocks
Browse files Browse the repository at this point in the history
  • Loading branch information
blinkbear committed Nov 22, 2024
1 parent ef84d8f commit ea8ea04
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 7 deletions.
9 changes: 9 additions & 0 deletions vllm/core/block_manager_v1.py
Original file line number Diff line number Diff line change
Expand Up @@ -628,6 +628,15 @@ def can_swap_in(self,
return AllocStatus.OK
else:
return AllocStatus.LATER
def can_swap_in_shared_blocks(self, seq_group: SequenceGroup) -> AllocStatus:
blocks = self._get_physical_blocks(seq_group)

num_swapped_seqs = seq_group.num_seqs(status=SequenceStatus.RUNNING)
free_shared_blocks = self.gpu_allocator.get_num_free_blocks(block_type="shared")
if free_shared_blocks < num_swapped_seqs:
return AllocStatus.NEVER
else:
return AllocStatus.OK

def _swap_block_table(
self, block_table: BlockTable, src_allocator: BlockAllocatorBase,
Expand Down
19 changes: 12 additions & 7 deletions vllm/core/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ def __init__(
# self.num_shared_blocks = min(int(self.scheduler_config.max_num_batched_tokens // self.cache_config.block_size // self.scheduler_config.waiting_iter_base),
# int(self.cache_config.num_gpu_blocks*0.1)+1)
# else:
self.num_shared_blocks = 0
self.num_shared_blocks = 2


# Create the block space manager.
Expand Down Expand Up @@ -1735,16 +1735,11 @@ def _schedule_prefills(
# If the sequence group cannot be allocated, stop.
can_allocate = self.block_manager.can_allocate(seq_group)
if can_allocate == AllocStatus.LATER:
# if self.scheduler_config.policy == "tfittradeoff": # Debug
# leftover_waiting_sequences.appendleft(seq_group)
# waiting_queue.popleft()
# continue
# else:
if self.scheduler_config.policy != "tfittradeoff":
break
num_new_seqs = seq_group.get_max_num_running_seqs()
can_allocate_shared = self.block_manager.can_allocate(seq_group, shared=True)
if can_allocate_shared == AllocStatus.LATER:
if can_allocate_shared == AllocStatus.LATER or seq_group.is_prefill():
waiting_queue.popleft()
leftover_waiting_sequences.appendleft(seq_group)
continue
Expand All @@ -1758,6 +1753,7 @@ def _schedule_prefills(
if seq_group.swap_out_moment is not None:
self.total_swap_out_waiting_time += time.time(
) - seq_group.swap_out_moment
print(f"seq_group is prefill: {seq_group.is_prefill()}")
kv_free_seq_groups.append(
ScheduledSequenceGroup(seq_group=seq_group,
token_chunk_size=num_new_tokens))
Expand Down Expand Up @@ -2098,6 +2094,12 @@ def _schedule_chunked_prefill(self):
self.running.extend(
[s.seq_group for s in swapped_in.prefill_seq_groups])

print(f"running_scheduled.decode_seq_groups {len(running_scheduled.decode_seq_groups)}, \
running_scheduled.prefill_seq_groups {len(running_scheduled.prefill_seq_groups)}, \
swapped_in.decode_seq_groups {len(swapped_in.decode_seq_groups)}, \
swapped_in.prefill_seq_groups {len(swapped_in.prefill_seq_groups)}, \
prefills.seq_groups {len(prefills.seq_groups)}, \
prefills.kv_free_seq_groups {len(prefills.kv_free_seq_groups)}")
self.kv_free_seq_groups = [s.seq_group.request_id for s in prefills.kv_free_seq_groups]
# Update swapped requests.
self.swapped = remaining_swapped
Expand Down Expand Up @@ -2394,6 +2396,9 @@ def _preempt(
else:
preemption_mode = PreemptionMode.SWAP

elif self.block_manager.can_swap_in_shared_blocks(seq_group):
print(f"{seq_group.request_id} can swap in shared blocks")
preemption_mode = PreemptionMode.RECOMPUTE
elif self.user_specified_preemption_mode == "swap" and self.block_manager.can_swap_out(
seq_group):
preemption_mode = PreemptionMode.SWAP
Expand Down

0 comments on commit ea8ea04

Please sign in to comment.