diff --git a/.gitignore b/.gitignore index 62b9e697..8abb7959 100644 --- a/.gitignore +++ b/.gitignore @@ -33,3 +33,8 @@ __pycache__ docs/build/ !**/*.template.rst + +# benchmark logs, result and figs +benchmarks/autoscaling/logs +benchmarks/autoscaling/output_stats +benchmarks/autoscaling/workload_plot \ No newline at end of file diff --git a/benchmarks/autoscaling/bench_workload_generator.py b/benchmarks/autoscaling/bench_workload_generator.py index 2f8e3d4b..079992a0 100644 --- a/benchmarks/autoscaling/bench_workload_generator.py +++ b/benchmarks/autoscaling/bench_workload_generator.py @@ -1,5 +1,6 @@ import logging import math +import os import random from typing import List, Any @@ -108,6 +109,7 @@ def plot_workload(workload_dict, interval_sec, output_path: str = None): if output_path is None: plt.show() else: + os.makedirs(os.path.dirname(output_path), exist_ok=True) plt.savefig(output_path) logging.info(f'Saved workload plot to {output_path}') diff --git a/benchmarks/autoscaling/benchmark.py b/benchmarks/autoscaling/benchmark.py index 05e21149..f6834aea 100644 --- a/benchmarks/autoscaling/benchmark.py +++ b/benchmarks/autoscaling/benchmark.py @@ -3,6 +3,7 @@ import os import random import time +from datetime import datetime from typing import List, Optional, Tuple, Dict import openai @@ -21,20 +22,48 @@ except ImportError: from backend_request_func import get_tokenizer -logging.basicConfig( - level=logging.INFO, - format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', - handlers=[ - logging.StreamHandler() - ] -) + +def setup_logging(log_filename, level=logging.INFO): + """ + Set the global log configuration. The logs will be written into the specified file and output to the console. + + :param log_filename: logging output file + """ + + logging.basicConfig( + level=level, + format="%(asctime)s - %(levelname)s - %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", + ) + if not os.path.exists((log_dir := os.path.dirname(log_filename))): + os.makedirs(log_dir, exist_ok=True) + + logger = logging.getLogger() + logger.setLevel(level) + + # create a handler to file + file_handler = logging.FileHandler(log_filename) + file_handler.setLevel(level) + + # create a handler to console + console_handler = logging.StreamHandler() + console_handler.setLevel(level) + + formatter = logging.Formatter( + "%(asctime)s - %(name)s - %(levelname)s - %(message)s" + ) + file_handler.setFormatter(formatter) + console_handler.setFormatter(formatter) + logger.addHandler(file_handler) + + logging.info(f"save log to {log_filename}") # Function to wrap the prompt into OpenAI's chat completion message format. def wrap_prompt_as_chat_message(prompt: str): """ Wrap the prompt into OpenAI's chat completion message format. - + :param prompt: The user prompt to be converted. :return: A list containing chat completion messages. """ @@ -96,6 +125,7 @@ def build_openai_endpoints(deployment_plan): # Asynchronous request handler async def send_request(client, model, endpoint, prompt, output_file): start_time = asyncio.get_event_loop().time() + start_ts = time.time() try: response = await client.chat.completions.create( model=model, @@ -113,24 +143,34 @@ async def send_request(client, model, endpoint, prompt, output_file): result = { "model": model, "endpoint": endpoint, + "start_timestamp": start_ts, + "latency": latency, "output": output_text, "prompt_tokens": prompt_tokens, "output_tokens": output_tokens, "total_tokens": total_tokens, - "latency": latency, - "throughput": throughput + "throughput": throughput, } - # Write result to JSONL file - output_file.write(json.dumps(result) + "\n") - output_file.flush() # Ensure data is written immediately to the file - logging.info( f"Request for {model} completed in {latency:.2f} seconds with throughput {throughput:.2f} tokens/s, answer: {output_text[:30]}...") return result except Exception as e: logging.error(f"Error sending request to {model} at {endpoint}: {str(e)}") + result = { + "model": model, + "endpoint": endpoint, + "start_timestamp": start_ts, + "latency": asyncio.get_event_loop().time() - start_time, + "start_time": start_time, + "error": str(e), + } return None + finally: + # Write result to JSONL file + output_file.write(json.dumps(result) + "\n") + output_file.flush() # Ensure data is written immediately to the file + # Benchmark requests and log results into the specified file @@ -195,12 +235,17 @@ def sample_sharegpt_requests( def main(args): + WORKLOAD_MODE = args.concurrency is not None tokenizer = get_tokenizer(args.model, trust_remote_code=True) if args.dataset_path is not None: logging.info(f"Start to sample {args.num_prompts} prompts from {args.dataset_path}") + num_prompts = args.num_prompts + if WORKLOAD_MODE: + # length * avearge bar + num_prompts = int(args.w_B * args.w_duration_sec / args.w_interval_sec) input_requests = sample_sharegpt_requests( dataset_path=args.dataset_path, - num_requests=args.num_prompts, + num_requests=num_prompts, tokenizer=tokenizer, fixed_output_len=args.sharegpt_output_len, ) @@ -214,7 +259,7 @@ def main(args): openai_clients = build_openai_clients(openai_endpoints) start_time = time.time() - if args.concurrency is not None: + if WORKLOAD_MODE: logging.info(f"Starting benchmark for {args.num_prompts} prompts with deployment {args.deployment_endpoints}") asyncio.run(benchmark_requests(openai_clients, openai_endpoints, input_requests, args.num_prompts, args.concurrency, args.output_file_path)) @@ -230,8 +275,7 @@ def main(args): ) # if you want to see the workload traffic trend plot_workload({"llm": workloads}, interval_sec=interval_sec, - output_path=f"workload_A{args.w_A}_B{args.w_B}_P{args.w_period}_" - f"D{args.w_duration_sec}s_I{interval_sec}s.png") + output_path=f"workload_plot/{identifier}.png") next_start = start_time + interval_sec for idx, each_input_requests in enumerate(workloads): @@ -242,7 +286,7 @@ def main(args): asyncio.run(benchmark_requests(openai_clients, openai_endpoints, each_input_requests, len(each_input_requests), len(each_input_requests), args.output_file_path)) - # wait until passing args.w_interval_sec + # wait until passing args.w_interval_sec wait_time = next_start - time.time() if wait_time > 0: time.sleep(wait_time) @@ -275,4 +319,14 @@ def main(args): parser = EngineArgs.add_cli_args(parser) args = parser.parse_args() + + if args.concurrency is not None: + identifier = f"onestep_np{args.num_prompts}_c{args.concurrency}" + else: + identifier = f"workload_A{args.w_A}_B{args.w_B}_P{args.w_period}_D{args.w_duration_sec}s_I{args.w_interval_sec}s" + identifier += "_" + datetime.now().strftime("%Y%m%d_%H%M%S") + args.output_file_path = f"output_stats/output_{identifier}.jsonl" + os.makedirs(os.path.dirname(args.output_file_path), exist_ok=True) + + setup_logging(f'logs/bench_{identifier}.log') main(args) diff --git a/benchmarks/autoscaling/kpa.yaml b/benchmarks/autoscaling/kpa.yaml index 6757f622..f91ab827 100644 --- a/benchmarks/autoscaling/kpa.yaml +++ b/benchmarks/autoscaling/kpa.yaml @@ -14,5 +14,5 @@ spec: minReplicas: 1 maxReplicas: 10 targetMetric: "vllm:gpu_cache_usage_perc" - targetValue: "50" + targetValue: "0.5" scalingStrategy: "KPA" \ No newline at end of file diff --git a/pkg/controller/podautoscaler/podautoscaler_controller.go b/pkg/controller/podautoscaler/podautoscaler_controller.go index 0b3e50de..f1920c8c 100644 --- a/pkg/controller/podautoscaler/podautoscaler_controller.go +++ b/pkg/controller/podautoscaler/podautoscaler_controller.go @@ -72,7 +72,7 @@ func newReconciler(mgr manager.Manager) (reconcile.Reconciler, error) { Scheme: mgr.GetScheme(), EventRecorder: mgr.GetEventRecorderFor("PodAutoscaler"), Mapper: mgr.GetRESTMapper(), - resyncInterval: 30 * time.Second, // TODO: this should be override by an environment variable + resyncInterval: 10 * time.Second, // TODO: this should be override by an environment variable eventCh: make(chan event.GenericEvent), } diff --git a/pkg/controller/podautoscaler/scaler/kpa.go b/pkg/controller/podautoscaler/scaler/kpa.go index 468b13f2..b41633b6 100644 --- a/pkg/controller/podautoscaler/scaler/kpa.go +++ b/pkg/controller/podautoscaler/scaler/kpa.go @@ -202,6 +202,14 @@ func (k *KpaAutoscaler) Scale(originalReadyPodsCount int, metricKey metrics.Name isOverPanicThreshold := dppc/readyPodsCount >= spec.PanicThreshold + klog.V(4).InfoS("--- KPA Details", "readyPodsCount", readyPodsCount, + "MaxScaleUpRate", spec.MaxScaleUpRate, "MaxScaleDownRate", spec.MaxScaleDownRate, + "TargetValue", spec.TargetValue, "PanicThreshold", spec.PanicThreshold, + "StableWindow", spec.StableWindow, "ScaleDownDelay", spec.ScaleDownDelay, + "dppc", dppc, "dspc", dspc, "desiredStablePodCount", desiredStablePodCount, + "PanicThreshold", spec.PanicThreshold, "isOverPanicThreshold", isOverPanicThreshold, + ) + if k.panicTime.IsZero() && isOverPanicThreshold { // Begin panicking when we cross the threshold in the panic window. klog.InfoS("Begin panicking") @@ -221,10 +229,10 @@ func (k *KpaAutoscaler) Scale(originalReadyPodsCount int, metricKey metrics.Name // In some edgecases stable window metric might be larger // than panic one. And we should provision for stable as for panic, // so pick the larger of the two. + klog.InfoS("Operating in panic mode.", "desiredPodCount", desiredPodCount, "desiredPanicPodCount", desiredPanicPodCount) if desiredPodCount < desiredPanicPodCount { desiredPodCount = desiredPanicPodCount } - klog.InfoS("Operating in panic mode.") // We do not scale down while in panic mode. Only increases will be applied. if desiredPodCount > k.maxPanicPods { klog.InfoS("Increasing pods count.", "originalPodCount", originalReadyPodsCount, "desiredPodCount", desiredPodCount) @@ -243,6 +251,7 @@ func (k *KpaAutoscaler) Scale(originalReadyPodsCount int, metricKey metrics.Name // not the same in the case where two Scale()s happen in the same time // interval (because the largest will be picked rather than the most recent // in that case). + klog.V(4).InfoS("DelayWindow details", "delayWindow", k.delayWindow.String()) if k.delayWindow != nil { k.delayWindow.Record(now, float64(desiredPodCount)) delayedPodCount, err := k.delayWindow.Max() diff --git a/pkg/utils/pod.go b/pkg/utils/pod.go index 1ad18faa..442a426d 100644 --- a/pkg/utils/pod.go +++ b/pkg/utils/pod.go @@ -20,6 +20,8 @@ import ( "context" "fmt" + "k8s.io/klog/v2" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" @@ -128,9 +130,11 @@ func CountReadyPods(podList *v1.PodList) (int64, error) { readyPodCount := 0 for _, pod := range podList.Items { - if pod.Status.Phase == v1.PodRunning && IsPodReady(&pod) { + isReady := IsPodReady(&pod) + if pod.Status.Phase == v1.PodRunning && isReady { readyPodCount++ } + klog.V(4).InfoS("CountReadyPods Pod status", "name", pod.Name, "phase", pod.Status.Phase, "ready", isReady) } return int64(readyPodCount), nil