Skip to content

Commit

Permalink
update waiting policy and add new args to control the serving time
Browse files Browse the repository at this point in the history
  • Loading branch information
mchen644 committed Oct 9, 2024
1 parent 713f92c commit 665e8dd
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 94 deletions.
12 changes: 6 additions & 6 deletions benchmarks/1_serving_benchmark.sh
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ echo $COUNTER >$COUNTER_FILE
# start vllm server
pwd=`pwd`
# model_name="meta-llama/Llama-2-70b-chat-hf"
# model_name="meta-llama/Llama-2-13b-chat-hf"
model_name="mistralai/Mistral-7B-Instruct-v0.1" # 32000
model_name="meta-llama/Llama-2-13b-chat-hf"
# model_name="mistralai/Mistral-7B-Instruct-v0.1" # 32000
# model_name="EleutherAI/gpt-neox-20b"
# model_name="facebook/opt-6.7b"
dataset_name="sharegpt"
Expand All @@ -25,7 +25,7 @@ result_dir="/root/v1/vllm/benchmarks/result"
# scheduler_policy=(infer)
# swap_policies=(partial)
declare -a scheduler_swap_policies
# scheduler_swap_policies[0]="tfittradeoff partial"
scheduler_swap_policies[0]="tfittradeoff partial"
scheduler_swap_policies[1]="fcfs full"
# scheduler_swap_policies[2]="las full"
# scheduler_swap_policies[1]="tfittradeoff full"
Expand All @@ -41,7 +41,7 @@ max_num_seqs=256
swap_space=64
max_tokens=2048
iter_theshold=15

max_serving_time=60
# request_rates[0]=0.5
# request_rates[1]=1.0
# request_rates[2]=2.0
Expand Down Expand Up @@ -69,13 +69,13 @@ for i in {0..0}; do
CUDA_VISIBLE_DEVICES=$gpu_devices taskset -c 23-24 python3 -m vllm.entrypoints.openai.api_server \
--model $model_name --swap-space $swap_space --preemption-mode $preemption_mode --scheduler-policy $policy \
--enable-chunked-prefill --max-num-batched-tokens $max_tokens --iter-threshold $iter_theshold --max-num-seqs $max_num_seqs --swap-out-tokens-policy $swap_policy --swap-out-partial-rate $swap_out_partial_rate --execution-budget $iter_theshold \
--tensor-parallel-size 1 --gpu-memory-utilization $gpu_memory_utilization --disable-sliding-window --waiting-iter-base $waiting_iter --disable-log-requests >api_server_${policy}_${swap_policy}.log 2>&1 &
--tensor-parallel-size 1 --gpu-memory-utilization $gpu_memory_utilization --disable-sliding-window --waiting-iter-base $waiting_iter --disable-log-requests --max-serving-time $max_serving_time >api_server_${policy}_${swap_policy}.log 2>&1 &
pid=$!

# run benchmark and save the output to benchmark.log
python3 benchmark_serving.py --execution-counter $COUNTER --dataset-path $dataset_path \
--dataset-name $dataset_name --request-rate $request_rate \
--num-prompts 500 --request-duration 500 --sharegpt-output-len 2000 --model $model_name --scheduler-policy $policy \
--num-prompts 500 --request-duration $max_serving_time --sharegpt-output-len 2000 --model $model_name --scheduler-policy $policy \
--save-result --result-dir $result_dir \
--metadata swap_space=$swap_space preemption_mode=$preemption_mode \
scheduler_policy=$policy gpu_memory_utilization=$gpu_memory_utilization \
Expand Down
3 changes: 1 addition & 2 deletions vllm/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -675,7 +675,7 @@ def __init__(
iter_threshold: int = 1,
embedding_mode: Optional[bool] = False,
preemption_mode: Optional[str] = None,
waiting_iter_base: float = 1.0
waiting_iter_base: float = 1.0,
) -> None:
if max_num_batched_tokens is not None:
self.max_num_batched_tokens = max_num_batched_tokens
Expand Down Expand Up @@ -710,7 +710,6 @@ def __init__(
self.embedding_mode = embedding_mode
self.preemption_mode = preemption_mode
self.waiting_iter_base= waiting_iter_base

self._verify_args()

def _verify_args(self) -> None:
Expand Down
84 changes: 2 additions & 82 deletions vllm/core/policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,53 +161,32 @@ class TFITTradeoff(Policy):

def _get_running_priority(self,avg_priority_rate:float, seq_group: SequenceGroup):
priority_rate = seq_group.priority_rate
decode_length = sum(seq.get_output_len() for seq in seq_group.seqs_dict.values())
# decode_length = seq_group.seq_len
# priority = priority_rate * decode_length / seq_group.max_length
# avoid to swap long sequence or the sequence with high priority.
if priority_rate != -1000:
# priority = priority_rate * seq_group.seq_len / seq_group.max_length
priority = priority_rate * seq_group.seq_len / seq_group.max_length
# priority = priority_rate * seq_group.seq_len
# else:
# priority = 1*decode_length/seq_group.max_length
else:
max_eos_token_pos = max(
(max(seq.get_eos_token_pos()) for seq in seq_group.seqs_dict.values()),
default=-1,
)
min_eos_token_pos = min(
(min(seq.get_eos_token_pos()) for seq in seq_group.seqs_dict.values()),
default=-1,
)

if min_eos_token_pos > 0:
# seq_group.priority_rate = (32000-max_eos_token_pos) / 32000 # 32,768, 50432
seq_group.priority_rate = (32000-min_eos_token_pos) / 32000 # 32,768, 50432
# seq_group.priority_rate = max_eos_token_pos / 32000
# seq_group.priority_rate = max_eos_token_pos / 32000
priority = (
seq_group.priority_rate
* seq_group.seq_len
# * (seq_group.max_length - decode_length)
# * decode_length
/ seq_group.max_length
)
else:
priority = (
avg_priority_rate
# * (seq_group.max_length - decode_length)
* seq_group.seq_len
/ seq_group.max_length
)

return priority

def _get_waiting_priority(self, avg_priority_rate: float, seq_group: SequenceGroup, pending_swapped_rate: float):
# priority_rate = max(
# (max(seq.get_eos_token_pos()) for seq in seq_group.seqs_dict.values()),
# default=-1,
# )
priority_rate = min(
(min(seq.get_eos_token_pos()) for seq in seq_group.seqs_dict.values()),
default=-1,
Expand All @@ -219,78 +198,19 @@ def _get_waiting_priority(self, avg_priority_rate: float, seq_group: SequenceGro
)
if priority_rate > 0:
seq_group.priority_rate = (32000 - priority_rate) / 32000
# seq_group.priority_rate = max_eos_token_pos / 32000
priority = (
priority_rate*(1-pending_swapped_rate)
* ( seq_group.seq_len+ seq_group.metrics.waiting_iter_nums)
* (decode_length+ seq_group.metrics.waiting_iter_nums)
/ seq_group.max_length
)
# else
# priority = 1*decode_length/seq_group.max_length
else:
priority = (
avg_priority_rate
* (decode_length+ seq_group.metrics.waiting_iter_nums*pending_swapped_rate)
# * (seq_group.seq_len+ seq_group.metrics.waiting_iter_nums*(1-pending_swapped_rate))
* (seq_group.seq_len+ seq_group.metrics.waiting_iter_nums*pending_swapped_rate)
/ seq_group.max_length
# / seq_group.max_length
)
# if priority_rate > 0:
# seq_group.priority_rate = (32000 - priority_rate) / 32000
# # seq_group.priority_rate = max_eos_token_pos / 32000
# # seq_group.priority_rate = max_eos_token_pos / 32000
# priority = (
# seq_group.priority_rate * seq_group.seq_len
# / seq_group.max_length
# ) # long sequence has higher priority.
# else:
# priority = (
# avg_priority_rate*pending_swapped_rate
# # * (decode_length+ seq_group.metrics.waiting_iter_nums*pending_swapped_rate)
# * (decode_length + seq_group.metrics.waiting_iter_nums)
# / seq_group.max_length
# )

return priority

# def _get_waiting_priority(self, avg_priority_rate: float, seq_group: SequenceGroup, pending_swapped_rate: float):
# priority_rate = seq_group.priority_rate
# decode_length = sum(
# seq.get_output_len() for seq in seq_group.seqs_dict.values()
# )
# # decode_length = seq_group.seq_len
# # priority = priority_rate * decode_length / seq_group.max_length
# # avoid to swap long sequence or the sequence with high priority.
# if priority_rate != -1000:
# priority = (
# priority_rate
# * ( seq_group.seq_len+ seq_group.metrics.waiting_iter_nums/decode_length*(pending_swapped_rate))
# / seq_group.max_length
# )
# # else
# # priority = 1*decode_length/seq_group.max_length
# else:
# max_eos_token_pos = max(
# (max(seq.get_eos_token_pos()) for seq in seq_group.seqs_dict.values()),
# default=-1,
# )
# if max_eos_token_pos > 0:
# seq_group.priority_rate = (32000 - max_eos_token_pos) / 32000
# # seq_group.priority_rate = max_eos_token_pos / 32000
# priority = (
# seq_group.priority_rate * seq_group.seq_len
# / seq_group.max_length
# ) # long sequence has higher priority.
# else:
# priority = (
# avg_priority_rate
# # * (decode_length+ seq_group.metrics.waiting_iter_nums*pending_swapped_rate)
# * (decode_length + seq_group.metrics.waiting_iter_nums/seq_group.seq_len*(1-pending_swapped_rate))
# / seq_group.max_length
# # / seq_group.max_length
# )

# return priority

def got_priority(
self,
Expand Down
13 changes: 11 additions & 2 deletions vllm/engine/arg_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ class EngineArgs:
waiting_iter_base: float = 1.0
swap_out_partial_rate: float = 0.5
execution_budget:int = 32768
max_serving_time: int = 600

# Related to Vision-language models such as llava
image_input_type: Optional[str] = None
Expand Down Expand Up @@ -556,7 +557,14 @@ def add_cli_args(
'before the engine preempts it. The engine will preempt the sequence if the execution budget is exceeded.'
'Also the execution budget is used to determine the maximum number of the waiting iterations before promoting into the running queue.'
)

parser.add_argument(
"--max-serving-time",
type=int,
default=EngineArgs.max_serving_time,
help='The maximum serving time of a sequence in seconds. '
'If the sequence exceeds this time, the engine will '
'terminate.'
)
parser.add_argument(
"--iter-threshold",
type=int,
Expand Down Expand Up @@ -837,7 +845,7 @@ class AsyncEngineArgs(EngineArgs):
engine_use_ray: bool = False
disable_log_requests: bool = False
max_log_len: Optional[int] = None

max_serving_time: int = 600
@staticmethod
def add_cli_args(parser: argparse.ArgumentParser,
async_args_only: bool = False) -> argparse.ArgumentParser:
Expand All @@ -856,6 +864,7 @@ def add_cli_args(parser: argparse.ArgumentParser,
help='Max number of prompt characters or prompt '
'ID numbers being printed in log.'
'\n\nDefault: Unlimited')
parser.add_argument("--max-serving-time", type=int, default=600, help="Max serving time in seconds")
return parser


Expand Down
6 changes: 4 additions & 2 deletions vllm/engine/async_llm_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,7 @@ def __init__(self,
log_requests: bool = True,
max_log_len: Optional[int] = None,
start_engine_loop: bool = True,
max_serving_time: int=600,
**kwargs) -> None:
self.worker_use_ray = worker_use_ray
self.engine_use_ray = engine_use_ray
Expand All @@ -435,7 +436,7 @@ def __init__(self,
self.background_loop_start_time = None
self.engine = self._init_engine(*args, **kwargs)
self.engine_start_time = time.time()

self.max_serving_time =max_serving_time
self.et = 0.0
self.background_loop: Optional[asyncio.Future] = None
# We need to keep a reference to unshielded
Expand Down Expand Up @@ -489,6 +490,7 @@ def from_engine_args(
log_stats=not engine_args.disable_log_stats,
max_log_len=engine_args.max_log_len,
start_engine_loop=start_engine_loop,
max_serving_time=engine_args.max_serving_time,
usage_context=usage_context,
)
return engine
Expand Down Expand Up @@ -596,7 +598,7 @@ async def engine_step(self) -> bool:
for request_output in request_outputs:
self._request_tracker.process_request_output(
request_output, verbose=self.log_requests)
if time.time()-self.engine_start_time > 600:
if time.time()-self.engine_start_time > self.max_serving_time:
self.engine.scheduler.reach_ddl = True
return len(request_outputs) > 0
# else:
Expand Down

0 comments on commit 665e8dd

Please sign in to comment.