Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

optimize benchmark scripts for autoscaler, add more logs #356

Merged
merged 5 commits into from
Nov 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,8 @@ __pycache__
docs/build/
!**/*.template.rst


# benchmark logs, result and figs
benchmarks/autoscaling/logs
benchmarks/autoscaling/output_stats
benchmarks/autoscaling/workload_plot
2 changes: 2 additions & 0 deletions benchmarks/autoscaling/bench_workload_generator.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging
import math
import os
import random
from typing import List, Any

Expand Down Expand Up @@ -108,6 +109,7 @@ def plot_workload(workload_dict, interval_sec, output_path: str = None):
if output_path is None:
plt.show()
else:
os.makedirs(os.path.dirname(output_path), exist_ok=True)
plt.savefig(output_path)
logging.info(f'Saved workload plot to {output_path}')

Expand Down
92 changes: 73 additions & 19 deletions benchmarks/autoscaling/benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import os
import random
import time
from datetime import datetime
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@happyandslow Can you help review the benchmark changes?

from typing import List, Optional, Tuple, Dict

import openai
Expand All @@ -21,20 +22,48 @@
except ImportError:
from backend_request_func import get_tokenizer

logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler()
]
)

def setup_logging(log_filename, level=logging.INFO):
"""
Set the global log configuration. The logs will be written into the specified file and output to the console.

:param log_filename: logging output file
"""

logging.basicConfig(
level=level,
format="%(asctime)s - %(levelname)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
if not os.path.exists((log_dir := os.path.dirname(log_filename))):
os.makedirs(log_dir, exist_ok=True)

logger = logging.getLogger()
logger.setLevel(level)

# create a handler to file
file_handler = logging.FileHandler(log_filename)
file_handler.setLevel(level)

# create a handler to console
console_handler = logging.StreamHandler()
console_handler.setLevel(level)

formatter = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
file_handler.setFormatter(formatter)
console_handler.setFormatter(formatter)
logger.addHandler(file_handler)

logging.info(f"save log to {log_filename}")


# Function to wrap the prompt into OpenAI's chat completion message format.
def wrap_prompt_as_chat_message(prompt: str):
"""
Wrap the prompt into OpenAI's chat completion message format.

:param prompt: The user prompt to be converted.
:return: A list containing chat completion messages.
"""
Expand Down Expand Up @@ -96,6 +125,7 @@ def build_openai_endpoints(deployment_plan):
# Asynchronous request handler
async def send_request(client, model, endpoint, prompt, output_file):
start_time = asyncio.get_event_loop().time()
start_ts = time.time()
try:
response = await client.chat.completions.create(
model=model,
Expand All @@ -113,24 +143,34 @@ async def send_request(client, model, endpoint, prompt, output_file):
result = {
"model": model,
"endpoint": endpoint,
"start_timestamp": start_ts,
"latency": latency,
"output": output_text,
"prompt_tokens": prompt_tokens,
"output_tokens": output_tokens,
"total_tokens": total_tokens,
"latency": latency,
"throughput": throughput
"throughput": throughput,
}

# Write result to JSONL file
output_file.write(json.dumps(result) + "\n")
output_file.flush() # Ensure data is written immediately to the file

logging.info(
f"Request for {model} completed in {latency:.2f} seconds with throughput {throughput:.2f} tokens/s, answer: {output_text[:30]}...")
return result
except Exception as e:
logging.error(f"Error sending request to {model} at {endpoint}: {str(e)}")
result = {
"model": model,
"endpoint": endpoint,
"start_timestamp": start_ts,
"latency": asyncio.get_event_loop().time() - start_time,
"start_time": start_time,
"error": str(e),
}
return None
finally:
# Write result to JSONL file
output_file.write(json.dumps(result) + "\n")
output_file.flush() # Ensure data is written immediately to the file



# Benchmark requests and log results into the specified file
Expand Down Expand Up @@ -195,12 +235,17 @@ def sample_sharegpt_requests(


def main(args):
WORKLOAD_MODE = args.concurrency is not None
tokenizer = get_tokenizer(args.model, trust_remote_code=True)
if args.dataset_path is not None:
logging.info(f"Start to sample {args.num_prompts} prompts from {args.dataset_path}")
num_prompts = args.num_prompts
if WORKLOAD_MODE:
# length * avearge bar
num_prompts = int(args.w_B * args.w_duration_sec / args.w_interval_sec)
input_requests = sample_sharegpt_requests(
dataset_path=args.dataset_path,
num_requests=args.num_prompts,
num_requests=num_prompts,
tokenizer=tokenizer,
fixed_output_len=args.sharegpt_output_len,
)
Expand All @@ -214,7 +259,7 @@ def main(args):
openai_clients = build_openai_clients(openai_endpoints)

start_time = time.time()
if args.concurrency is not None:
if WORKLOAD_MODE:
logging.info(f"Starting benchmark for {args.num_prompts} prompts with deployment {args.deployment_endpoints}")
asyncio.run(benchmark_requests(openai_clients, openai_endpoints, input_requests, args.num_prompts, args.concurrency,
args.output_file_path))
Expand All @@ -230,8 +275,7 @@ def main(args):
)
# if you want to see the workload traffic trend
plot_workload({"llm": workloads}, interval_sec=interval_sec,
output_path=f"workload_A{args.w_A}_B{args.w_B}_P{args.w_period}_"
f"D{args.w_duration_sec}s_I{interval_sec}s.png")
output_path=f"workload_plot/{identifier}.png")
next_start = start_time + interval_sec
for idx, each_input_requests in enumerate(workloads):

Expand All @@ -242,7 +286,7 @@ def main(args):
asyncio.run(benchmark_requests(openai_clients, openai_endpoints, each_input_requests, len(each_input_requests),
len(each_input_requests), args.output_file_path))

# wait until passing args.w_interval_sec
# wait until passing args.w_interval_sec
wait_time = next_start - time.time()
if wait_time > 0:
time.sleep(wait_time)
Expand Down Expand Up @@ -275,4 +319,14 @@ def main(args):

parser = EngineArgs.add_cli_args(parser)
args = parser.parse_args()

if args.concurrency is not None:
identifier = f"onestep_np{args.num_prompts}_c{args.concurrency}"
else:
identifier = f"workload_A{args.w_A}_B{args.w_B}_P{args.w_period}_D{args.w_duration_sec}s_I{args.w_interval_sec}s"
identifier += "_" + datetime.now().strftime("%Y%m%d_%H%M%S")
args.output_file_path = f"output_stats/output_{identifier}.jsonl"
os.makedirs(os.path.dirname(args.output_file_path), exist_ok=True)

setup_logging(f'logs/bench_{identifier}.log')
main(args)
2 changes: 1 addition & 1 deletion benchmarks/autoscaling/kpa.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,5 @@ spec:
minReplicas: 1
maxReplicas: 10
targetMetric: "vllm:gpu_cache_usage_perc"
targetValue: "50"
targetValue: "0.5"
scalingStrategy: "KPA"
2 changes: 1 addition & 1 deletion pkg/controller/podautoscaler/podautoscaler_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func newReconciler(mgr manager.Manager) (reconcile.Reconciler, error) {
Scheme: mgr.GetScheme(),
EventRecorder: mgr.GetEventRecorderFor("PodAutoscaler"),
Mapper: mgr.GetRESTMapper(),
resyncInterval: 30 * time.Second, // TODO: this should be override by an environment variable
resyncInterval: 10 * time.Second, // TODO: this should be override by an environment variable
eventCh: make(chan event.GenericEvent),
}

Expand Down
11 changes: 10 additions & 1 deletion pkg/controller/podautoscaler/scaler/kpa.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,14 @@ func (k *KpaAutoscaler) Scale(originalReadyPodsCount int, metricKey metrics.Name

isOverPanicThreshold := dppc/readyPodsCount >= spec.PanicThreshold

klog.V(4).InfoS("--- KPA Details", "readyPodsCount", readyPodsCount,
"MaxScaleUpRate", spec.MaxScaleUpRate, "MaxScaleDownRate", spec.MaxScaleDownRate,
"TargetValue", spec.TargetValue, "PanicThreshold", spec.PanicThreshold,
"StableWindow", spec.StableWindow, "ScaleDownDelay", spec.ScaleDownDelay,
"dppc", dppc, "dspc", dspc, "desiredStablePodCount", desiredStablePodCount,
"PanicThreshold", spec.PanicThreshold, "isOverPanicThreshold", isOverPanicThreshold,
)

if k.panicTime.IsZero() && isOverPanicThreshold {
// Begin panicking when we cross the threshold in the panic window.
klog.InfoS("Begin panicking")
Expand All @@ -221,10 +229,10 @@ func (k *KpaAutoscaler) Scale(originalReadyPodsCount int, metricKey metrics.Name
// In some edgecases stable window metric might be larger
// than panic one. And we should provision for stable as for panic,
// so pick the larger of the two.
klog.InfoS("Operating in panic mode.", "desiredPodCount", desiredPodCount, "desiredPanicPodCount", desiredPanicPodCount)
if desiredPodCount < desiredPanicPodCount {
desiredPodCount = desiredPanicPodCount
}
klog.InfoS("Operating in panic mode.")
// We do not scale down while in panic mode. Only increases will be applied.
if desiredPodCount > k.maxPanicPods {
klog.InfoS("Increasing pods count.", "originalPodCount", originalReadyPodsCount, "desiredPodCount", desiredPodCount)
Expand All @@ -243,6 +251,7 @@ func (k *KpaAutoscaler) Scale(originalReadyPodsCount int, metricKey metrics.Name
// not the same in the case where two Scale()s happen in the same time
// interval (because the largest will be picked rather than the most recent
// in that case).
klog.V(4).InfoS("DelayWindow details", "delayWindow", k.delayWindow.String())
if k.delayWindow != nil {
k.delayWindow.Record(now, float64(desiredPodCount))
delayedPodCount, err := k.delayWindow.Max()
Expand Down
6 changes: 5 additions & 1 deletion pkg/utils/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"context"
"fmt"

"k8s.io/klog/v2"

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
Expand Down Expand Up @@ -128,9 +130,11 @@ func CountReadyPods(podList *v1.PodList) (int64, error) {

readyPodCount := 0
for _, pod := range podList.Items {
if pod.Status.Phase == v1.PodRunning && IsPodReady(&pod) {
isReady := IsPodReady(&pod)
if pod.Status.Phase == v1.PodRunning && isReady {
readyPodCount++
}
klog.V(4).InfoS("CountReadyPods Pod status", "name", pod.Name, "phase", pod.Status.Phase, "ready", isReady)
}

return int64(readyPodCount), nil
Expand Down
Loading