Skip to content

Commit

Permalink
Add _process_model_outputs back
Browse files Browse the repository at this point in the history
  • Loading branch information
s5u13b committed Feb 7, 2025
1 parent a51cf25 commit f2df197
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 3 deletions.
44 changes: 42 additions & 2 deletions llumnix/backends/vllm/llm_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from vllm.utils import Counter
from vllm.usage.usage_lib import UsageContext
from vllm.engine.output_processor.single_step import SingleStepOutputProcessor
from vllm.engine.llm_engine import SchedulerContext

from llumnix.logging.logger import init_logger
from llumnix.instance_info import InstanceInfo
Expand All @@ -53,8 +54,8 @@ def _process_sequence_group_outputs(self,
seq_group: SequenceGroup,
outputs: SequenceGroupOutput,
is_async: bool) -> None:
if RequestStatus.is_migrating(seq_group.status):
return
# if RequestStatus.is_migrating(seq_group.status):
# return
super()._process_sequence_group_outputs(seq_group, outputs, is_async)


Expand Down Expand Up @@ -129,6 +130,45 @@ def from_engine_args(
)
return engine

def _process_model_outputs(self,
ctx: SchedulerContext,
request_id: Optional[str] = None) -> None:
if len(ctx.output_queue) == 0:
return None

if request_id:
(outputs, seq_group_metadata_list, scheduler_outputs, is_async,
is_last_step, is_first_step_output, skip) = ctx.output_queue[0]
else:
(outputs, seq_group_metadata_list, scheduler_outputs, is_async,
is_last_step, is_first_step_output,
skip) = ctx.output_queue.popleft()

# Filter out outputs of migrating requests.
if outputs:
new_outputs = []
new_scheduled_seq_groups = []
new_seq_group_metadata_list = []
for scheduled_seq_group, seq_group_meta, seq_group_output in \
zip(scheduler_outputs.scheduled_seq_groups, seq_group_metadata_list, outputs[0].outputs):
seq_group = scheduled_seq_group.seq_group
if seq_group.get_seqs(SequenceStatus.RUNNING):
new_scheduled_seq_groups.append(scheduled_seq_group)
new_seq_group_metadata_list.append(seq_group_meta)
new_outputs.append(seq_group_output)
scheduler_outputs.scheduled_seq_groups = new_scheduled_seq_groups
outputs[0].outputs = new_outputs
seq_group_metadata_list = new_seq_group_metadata_list

if request_id:
ctx.output_queue[0] = (outputs, seq_group_metadata_list, scheduler_outputs, is_async,
is_last_step, is_first_step_output, skip)
else:
ctx.output_queue.appendleft((outputs, seq_group_metadata_list, scheduler_outputs, is_async,
is_last_step, is_first_step_output, skip))

return super()._process_model_outputs(ctx, request_id)

def _process_request_outputs(
self,
outputs: List[Tuple[RequestOutput,ServerInfo]],
Expand Down
4 changes: 3 additions & 1 deletion llumnix/backends/vllm/outputs.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from vllm.outputs import RequestOutput, RequestOutputFactory, EmbeddingRequestOutput

from llumnix.backends.vllm.sequence import SequenceGroupLlumnix
from llumnix.backends.vllm.sequence import SequenceGroupLlumnix, RequestStatus


class LlumnixRequestOutputFactory(RequestOutputFactory):
Expand All @@ -10,5 +10,7 @@ def create(seq_group: SequenceGroupLlumnix, use_cache: bool = False):
if hasattr(seq_group,
'embeddings') and seq_group.embeddings is not None:
return EmbeddingRequestOutput.from_seq_group(seq_group), seq_group.server_info
# if RequestStatus.is_migrating(seq_group.status):
# return None
# pylint: disable=too-many-function-args
return RequestOutput.from_seq_group(seq_group, use_cache), seq_group.server_info

0 comments on commit f2df197

Please sign in to comment.