From 02b0ac50eae18b70fd5ff959ac045a881b96c333 Mon Sep 17 00:00:00 2001 From: Jingyuan Zhang Date: Thu, 5 Dec 2024 15:35:47 -0800 Subject: [PATCH] Revert "Refactor generator to generate time-based traces (#478)" This reverts commit aa77efb5f83285c3079afbc0400916cd4a152c1e. --- benchmarks/generator/README.md | 11 +- benchmarks/generator/sample_request.py | 103 ------------- benchmarks/generator/utils.py | 140 ++++++++++++----- benchmarks/generator/workload_generator.py | 166 +++++++++------------ 4 files changed, 185 insertions(+), 235 deletions(-) delete mode 100644 benchmarks/generator/sample_request.py diff --git a/benchmarks/generator/README.md b/benchmarks/generator/README.md index eaec9b5f..57006818 100644 --- a/benchmarks/generator/README.md +++ b/benchmarks/generator/README.md @@ -5,18 +5,17 @@ If no trace file path is specified, the generator will generate workload file ba ``` export SHARE_GPT_PATH=${PATH_TO_SHARE_GPT_FILE} -python workload_generator.py --prompt-file $SHARE_GPT_PATH --num-prompts 100 --interval-ms 1000 --duration-ms 600000 --trace-type synthetic --model "Qwen/Qwen2.5-Coder-7B-Instruct" --output "output" +python workload_generator.py --prompt-file $SHARE_GPT_PATH --num-prompts 100 --num-requests 10000 --trace-type synthetic --output "output" ``` -Here ```--interval-ms``` specifies the granularity of concurently dispatched requests (in milliseconds). ```--duration-ms``` specifies the total length of the trace in milliseconds. The file would be stored under ```output``` folder based on the name of different patterns. And the plot illustrates the workload pattern will be under the ```plot``` directory. -## Generate a workload file based on internal load summary .csv file +## Generate a workload file based on load summary .csv file ``` export SUMMARY_FILE=${PATH_TO_SUMMARY_FILE} export SHARE_GPT_PATH=${PATH_TO_SHARE_GPT_FILE} -python workload_generator.py --prompt-file $SHARE_GPT_PATH --num-prompts 100 --interval-ms 1000 --duration-ms 3600000 --trace-type internal --trace-file "$SUMMARY_FILE" --model "Qwen/Qwen2.5-Coder-7B-Instruct" --output "output" +python workload_generator.py --prompt-file $SHARE_GPT_PATH --num-prompts 100 --num-requests 100000 --trace-type summary --trace-file "$SUMMARY_FILE" --output "output" ``` This generator assumes trace file to be in the following format @@ -37,14 +36,14 @@ This generator generate workload file (in .json format) under ```output``` folde And the plot illustrates the workload pattern will be under the ```plot``` directory. -## Generate a workload file based on Azure LLM Trace +## Generate a workload file based on load summary .csv file To produce a workload based on [Azure LLM Trace](https://github.com/Azure/AzurePublicDataset/tree/master/data), use the following commands: ``` export AZURE_TRACE_NAME=${PATH_TO_AZURE_TRACE_NAME} export SHARE_GPT_PATH=${PATH_TO_SHARE_GPT_FILE} -python workload_generator.py --prompt-file $SHARE_GPT_PATH --num-prompts 100 --interval-ms 1000 --duration-ms 3600000 --trace-type azure --trace-file "$AZURE_TRACE_NAME" --group-interval-seconds 1 --model "Qwen/Qwen2.5-Coder-7B-Instruct" --output "output" +python workload_generator.py --prompt-file $SHARE_GPT_PATH --num-prompts 100 --num-requests 100000 --trace-type azure --trace-file "$AZURE_TRACE_NAME" --group-interval-seconds 1 --model "meta-llama/Llama-2-7b-hf" --output "output" ``` Note that the trace file contains both input and output lengths. And therefore dataset in ```$SHARE_GPT_PATH``` needs to be tokenized to be able to sampled based on their input/output token lengths. Therefore it is required to specify tokenizer to generate based on this trace. Use ```--group-interval-seconds``` to specify grouping interval from the origianl trace. The file would be stored under ```output``` folder and the plot illustrates the workload pattern will be under the ```plot``` directory. \ No newline at end of file diff --git a/benchmarks/generator/sample_request.py b/benchmarks/generator/sample_request.py deleted file mode 100644 index e3d84775..00000000 --- a/benchmarks/generator/sample_request.py +++ /dev/null @@ -1,103 +0,0 @@ -import logging -import json - -import pandas as pd - -from typing import Tuple, Optional, List -from transformers import PreTrainedTokenizerBase - -def load_sharegpt_requests( - dataset_path: str, - tokenizer: PreTrainedTokenizerBase, - ) -> pd.DataFrame: - # Load the dataset into a DataFrame - with open(dataset_path, encoding='utf-8') as f: - dataset = json.load(f) - dataset = [ - (data["conversations"][0]["value"], data["conversations"][1]["value"]) - for data in dataset if len(data["conversations"]) >= 2 - ] - df = pd.DataFrame(dataset, columns=["prompt", "completion"]) - logging.warn(f"...Start dataframe transformation") - # Tokenize and calculate lengths - df["prompt_len"] = df["prompt"].apply(lambda x: len(tokenizer(x).input_ids)) - df["completion_len"] = df["completion"].apply(lambda x: len(tokenizer(x).input_ids)) - logging.warn(f"...Complete dataframe transformation") - return df - -def sample_sharegpt_requests( - dataset_path: str, - num_requests: int, - tokenizer: Optional[PreTrainedTokenizerBase] = None, - fixed_output_len: Optional[int] = None, -) -> List[Tuple[str, int, int, None]]: - # Load the dataset - with open(dataset_path, encoding='utf-8') as f: - dataset = json.load(f) - dataset = [data for data in dataset if len(data["conversations"]) >= 2] - dataset = [(data["conversations"][0]["value"], data["conversations"][1]["value"]) for data in dataset] - - filtered_dataset: List[Tuple[str, int, int]] = [] - for i in range(len(dataset)): - if len(filtered_dataset) == num_requests: - break - prompt = dataset[i][0] - if tokenizer is not None: - prompt_token_ids = tokenizer(prompt).input_ids - completion = dataset[i][1] - completion_token_ids = tokenizer(completion).input_ids - prompt_len = len(prompt_token_ids) - output_len = len(completion_token_ids) if fixed_output_len is None else fixed_output_len - if prompt_len < 4 or (fixed_output_len is None and output_len < 4): - continue - if prompt_len > 1024 or prompt_len + output_len > 2048: - continue - filtered_dataset.append((prompt, prompt_len, output_len, None)) - else: - filtered_dataset.append((prompt, -1, -1, None)) - - return filtered_dataset - - -def sample_sharegpt_requests_len_range( - df: pd.DataFrame, - num_requests: int, - input_lens: List[int], - output_lens: List[int], - initial_err_perc: Optional[float] = 0.5, - err_step: float = 0.05 -) -> List[Tuple[str, int, int, None]]: - filtered_results = [] - - # Relaxation mechanism - for i in range(num_requests): - input_len = input_lens[i] - output_len = output_lens[i] - err_perc = initial_err_perc - - while err_perc >= 0: - input_range = (int(input_len * err_perc), int(input_len * (1 + err_perc))) - output_range = (int(output_len * err_perc), int(output_len * (1 + err_perc))) - - filtered = df[ - (df["prompt_len"] >= input_range[0]) & - (df["prompt_len"] <= input_range[1]) & - (df["completion_len"] >= output_range[0]) & - (df["completion_len"] <= output_range[1]) - ] - - if not filtered.empty: - # Select the first match or random sample - sample = filtered.iloc[0] # Or filtered.sample(1) for random - filtered_results.append((sample["prompt"], sample["prompt_len"], sample["completion_len"], None)) - break # Stop relaxing for this request once a match is found - - # Reduce err_perc for next iteration - logging.warn(f"Relax err_perc {err_perc} by {err_step}") - err_perc -= err_step - - if err_perc < 0: - raise Exception(f"No match found for request {i + 1} even after relaxing err_perc to 0") - - return filtered_results - diff --git a/benchmarks/generator/utils.py b/benchmarks/generator/utils.py index 5b2c8f6c..e9ce4831 100644 --- a/benchmarks/generator/utils.py +++ b/benchmarks/generator/utils.py @@ -3,10 +3,11 @@ import os import numpy as np +import pandas as pd import matplotlib.pyplot as plt -from typing import List, Union, Any, Optional -from transformers import (AutoTokenizer, PreTrainedTokenizer, +from typing import Tuple, Optional, List, Union +from transformers import (AutoTokenizer, PreTrainedTokenizer, PreTrainedTokenizerBase, PreTrainedTokenizerFast) def make_serializable(data): @@ -24,27 +25,124 @@ def make_serializable(data): else: return data +def sample_sharegpt_requests( + dataset_path: str, + num_requests: int, + tokenizer: Optional[PreTrainedTokenizerBase] = None, + fixed_output_len: Optional[int] = None, +) -> List[Tuple[str, int, int, None]]: + # Load the dataset + with open(dataset_path, encoding='utf-8') as f: + dataset = json.load(f) + dataset = [data for data in dataset if len(data["conversations"]) >= 2] + dataset = [(data["conversations"][0]["value"], data["conversations"][1]["value"]) for data in dataset] + + filtered_dataset: List[Tuple[str, int, int]] = [] + for i in range(len(dataset)): + if len(filtered_dataset) == num_requests: + break + prompt = dataset[i][0] + if tokenizer is not None: + prompt_token_ids = tokenizer(prompt).input_ids + completion = dataset[i][1] + completion_token_ids = tokenizer(completion).input_ids + prompt_len = len(prompt_token_ids) + output_len = len(completion_token_ids) if fixed_output_len is None else fixed_output_len + if prompt_len < 4 or (fixed_output_len is None and output_len < 4): + continue + if prompt_len > 1024 or prompt_len + output_len > 2048: + continue + filtered_dataset.append((prompt, prompt_len, output_len, None)) + else: + filtered_dataset.append((prompt, -1, -1, None)) + + return filtered_dataset + +def load_sharegpt_requests( + dataset_path: str, + tokenizer: PreTrainedTokenizerBase, + ) -> pd.DataFrame: + # Load the dataset into a DataFrame + with open(dataset_path, encoding='utf-8') as f: + dataset = json.load(f) + dataset = [ + (data["conversations"][0]["value"], data["conversations"][1]["value"]) + for data in dataset if len(data["conversations"]) >= 2 + ] + df = pd.DataFrame(dataset, columns=["prompt", "completion"]) + logging.INFO(f"...Start dataframe transformation") + # Tokenize and calculate lengths + df["prompt_len"] = df["prompt"].apply(lambda x: len(tokenizer(x).input_ids)) + df["completion_len"] = df["completion"].apply(lambda x: len(tokenizer(x).input_ids)) + logging.INFO(f"...Complete dataframe transformation") + return df + + +def sample_sharegpt_requests_len_range( + df: pd.DataFrame, + num_requests: int, + input_lens: List[int], + output_lens: List[int], + initial_err_perc: Optional[float] = 0.5, + err_step: float = 0.05 +) -> List[Tuple[str, int, int, None]]: + filtered_results = [] + + # Relaxation mechanism + for i in range(num_requests): + input_len = input_lens[i] + output_len = output_lens[i] + err_perc = initial_err_perc + + while err_perc >= 0: + input_range = (int(input_len * err_perc), int(input_len * (1 + err_perc))) + output_range = (int(output_len * err_perc), int(output_len * (1 + err_perc))) + + filtered = df[ + (df["prompt_len"] >= input_range[0]) & + (df["prompt_len"] <= input_range[1]) & + (df["completion_len"] >= output_range[0]) & + (df["completion_len"] <= output_range[1]) + ] + + if not filtered.empty: + # Select the first match or random sample + sample = filtered.iloc[0] # Or filtered.sample(1) for random + filtered_results.append((sample["prompt"], sample["prompt_len"], sample["completion_len"], None)) + break # Stop relaxing for this request once a match is found + + # Reduce err_perc for next iteration + logging.warn(f"Relax err_perc {err_perc} by {err_step}") + err_perc -= err_step + + if err_perc < 0: + raise Exception(f"No match found for request {i + 1} even after relaxing err_perc to 0") + + logging.info(f"Successfully found {len(filtered_results)} requests") + return filtered_results + + def get_tokenizer( pretrained_model_name_or_path: str, trust_remote_code: bool ) -> Union[PreTrainedTokenizer, PreTrainedTokenizerFast]: return AutoTokenizer.from_pretrained(pretrained_model_name_or_path, trust_remote_code=trust_remote_code) -def plot_workload(workload_dict, interval_ms, output_file: str = None): +def plot_workload(workload_dict, interval_sec, output_file: str = None): """ Plots the concurrency (item length) of the generated workload. Args: workload_dict (dict): A dictionary where the keys are workload names (labels) and the values are lists of lists representing the workload. - interval_ms (int): Interval in milliseconds. + interval_sec (int): """ fig, ax = plt.subplots() for workload_name, workload in workload_dict.items(): - concurrency_values = [len(item) for (_, item) in workload] - ax.plot(np.arange(len(concurrency_values)) * interval_ms, concurrency_values, label=workload_name) + concurrency_values = [len(item) for item in workload] + ax.plot(np.arange(len(concurrency_values)) * interval_sec, concurrency_values, label=workload_name) ax.set_ylim(0,) - plt.xlabel('Time (ms)') + plt.xlabel('Time (Sec)') plt.ylabel('Concurrency') plt.title('Workload Concurrency') plt.legend() @@ -53,30 +151,4 @@ def plot_workload(workload_dict, interval_ms, output_file: str = None): else: os.makedirs(os.path.dirname(output_file), exist_ok=True) plt.savefig(output_file) - logging.warn(f'Saved workload plot to {output_file}') - -def save_workload(load_struct: List[Any], - output_path: str, - use_jsonl: Optional[bool] = False): - if use_jsonl: - with open(output_path + ".jsonl", "w") as file: - for row in load_struct: - json_line = json.dumps(row) # Convert list to JSON string - file.write(json_line + "\n") - logging.warn(f'Saved workload file to {output_path + ".jsonl"}') - else: - with open(output_path + ".json", 'w') as file: - json.dump(load_struct, file, indent=4) - logging.warn(f'Saved workload file to {output_path + ".json"}') - -def load_workload(load_struct: List[Any], - input_path: str, - use_jsonl: Optional[bool] = False) -> List[Any]: - load_struct = None - if use_jsonl: - with open(input_path, "r") as file: - load_struct = [json.loads(line) for line in file] - else: - with open(input_path, "r") as file: - load_struct = json.load(file) - return load_struct \ No newline at end of file + logging.info(f'Saved workload plot to {output_file}') \ No newline at end of file diff --git a/benchmarks/generator/workload_generator.py b/benchmarks/generator/workload_generator.py index 4b39a11d..4adaa20c 100644 --- a/benchmarks/generator/workload_generator.py +++ b/benchmarks/generator/workload_generator.py @@ -1,50 +1,52 @@ import logging import math import random +import json import pandas as pd import argparse import csv from typing import Tuple, List, Any from transformers import PreTrainedTokenizerBase -from datetime import timedelta -from sample_request import (load_sharegpt_requests, sample_sharegpt_requests, sample_sharegpt_requests_len_range) -from utils import (get_tokenizer, plot_workload, make_serializable, save_workload) +from datetime import datetime, timedelta +from utils import (sample_sharegpt_requests, load_sharegpt_requests,sample_sharegpt_requests_len_range, get_tokenizer, plot_workload, make_serializable) -def generate_from_internal_csv(file_path: str, - duration_ms: int, - summary_interval_ms: int, - interval_ms: int = 1000, +def generate_from_summary_csv(input_requests: List[Any], + file_path: str, + sampling_granularity_seconds: int = 15, ) -> List[List[Any]]: - total_requests_from_summary = [] + total_requests_from_trace = [] with open(file_path, 'r') as file: reader = csv.DictReader(file) for row in reader: if 'Total' in row: total_value = row['Total'] if total_value: - total_requests_from_summary.append(float(total_value)) + total_requests_from_trace.append(float(total_value)) workloads = [] base = 0 - ts = 0 - for interval_requests in total_requests_from_summary: - mean_rate = round(interval_requests/(summary_interval_ms / interval_ms)) - for ts_delta in list(range(0, summary_interval_ms, interval_ms)): - workloads.append((ts + ts_delta, range(base, base + mean_rate))) + end = False + for interval_requests in total_requests_from_trace: + if end: + break + mean_rate = round(interval_requests/sampling_granularity_seconds) + for _ in range(0, sampling_granularity_seconds): + bound = min(base + mean_rate, len(input_requests)) + workloads.append(input_requests[base : bound]) base += mean_rate - ts += summary_interval_ms - if ts > duration_ms: - break + if base >= len(input_requests): + end = True + break return workloads -def generate_synthetic(A=1, B=1, +def generate_synthetic(input_requests: List[Any], A=1, B=1, sigma=0.1, only_rise: bool = False, omega: float = None, period=0.25, length: int = None, - duration_ms: int = None, - interval_ms: int = None, + duration_sec: int = None, + interval_sec: int = None, ) -> List[List[Any]]: """ Generates a workload based on a given list of input requests and a concurrency function. @@ -62,9 +64,9 @@ def generate_synthetic(A=1, B=1, omega (float, optional): if None, omega = pi / (2 * length / period) period (float, optional): See omega. Defaults to 0.25. only_rise: if True, the concurrency will monotonically increase - length (int, optional): if None, length = duration_ms / interval_ms - duration_ms (int, optional): See param: length - interval_ms (int, optional): See param: length + length (int, optional): if None, length = test_duration_sec / interval_sec + duration_sec (int, optional): See param: length + interval_sec (int, optional): See param: length Returns: list: A list of items, where each item is a list of requests to be sent concurrently. @@ -89,20 +91,19 @@ def math_function(t): noise = random.gauss(0, sigma) return round(trend + noise) - assert length is not None or (duration_ms is not None and interval_ms is not None), \ - "duration_ms and interval_ms must be specified if length is not None" + assert length is not None or (duration_sec is not None and interval_sec is not None), \ + "duration_sec and interval_sec must be specified if length is not None" if length is None: - length = int(duration_ms // interval_ms) + 1 + length = int(duration_sec // interval_sec) assert omega is not None or period is not None, "period must be specified if length is not None" if omega is None: omega = 2 * math.pi / (length / period) workload = [] t = 0 + input_length = len(input_requests) previous_concurrency = -1 start_index, end_index = 0, 0 - ts = 0 - base_req_id = 0 while t < length: current_concurrency = math_function(t) if only_rise: @@ -112,38 +113,32 @@ def math_function(t): # start from last end index start_index = end_index end_index += current_concurrency - workload.append((ts, [base_req_id + i for i in range(start_index, end_index)])) - base_req_id += current_concurrency - ts += interval_ms + workload.append([input_requests[i % input_length] for i in range(start_index, end_index)]) t += 1 return workload -def pair_requests_with_prompts_round_robin(workload: List[List[Any]], - prompts: List[Tuple[str, int, int, None]], - output_file: str = 'output/output', - to_jsonl: bool = False - ) -> List[List[Tuple[Any, str]]]: +def pair_requests_with_prompts(workload: List[List[Any]], prompts: List[Tuple[str, int, int, None]], output_file: str = 'output/output.json') -> List[List[Tuple[Any, str]]]: paired_workload = [] prompt_count = len(prompts) - for ts, requests in workload: + + for requests in workload: requests_with_prompts = [ prompts[request % prompt_count] for request in requests ] - paired_workload.append((ts, requests_with_prompts)) + paired_workload.append(requests_with_prompts) # Save to file - save_workload(paired_workload, output_file, use_jsonl = to_jsonl) + with open(output_file, 'w') as file: + json.dump(paired_workload, file) return paired_workload # generated_workload = generate_from_azure_csv(demo_requests, file_path=args.trace_file, sampling_granularity_seconds=15, output_file=args.output) def generate_from_azure_csv(file_path: str, prompt_file_path: str, - duration_ms: int, tokenizer: PreTrainedTokenizerBase, - interval_ms: int, + group_interval_seconds: int = 1, output_file: str = 'output/output.json', - to_jsonl: bool = False, ) -> List[List[Any]]: # Load the CSV file df = pd.read_csv(file_path) @@ -152,7 +147,7 @@ def generate_from_azure_csv(file_path: str, df['TIMESTAMP'] = pd.to_datetime(df['TIMESTAMP']) # Define the grouping time range (e.g., 1 second) - time_range = timedelta(milliseconds=interval_ms) + time_range = timedelta(seconds=group_interval_seconds) # Initialize a list to hold the grouped requests grouped_requests = [] @@ -161,11 +156,10 @@ def generate_from_azure_csv(file_path: str, df.set_index('TIMESTAMP', inplace=True) current_time = df.index.min() end_time = df.index.max() - logging.warn(f"Start generation from time {current_time} to {end_time}") + logging.INFO(f"Start generation from time {current_time} to {end_time}") sharegpt_df = load_sharegpt_requests(dataset_path = prompt_file_path, tokenizer = tokenizer) - ts = 0 while current_time <= end_time: # Select requests within the current time range mask = (df.index >= current_time) & (df.index < current_time + time_range) @@ -175,6 +169,7 @@ def generate_from_azure_csv(file_path: str, for _, row in group.iterrows(): input_lens.append(int(row['ContextTokens'])) output_lens.append(int(row['GeneratedTokens'])) + logging.info(f"Sample iteration {len(grouped_requests)} for {len(input_lens)} requests") sampled_requests = sample_sharegpt_requests_len_range( df = sharegpt_df, num_requests = len(input_lens), @@ -185,77 +180,64 @@ def generate_from_azure_csv(file_path: str, ) if sampled_requests: # Only add non-empty groups - grouped_requests.append((ts, sampled_requests)) - ts += interval_ms - if ts > duration_ms: - break + grouped_requests.append(sampled_requests) + # Move to the next time range current_time += time_range # Print or process grouped_requests as needed # Save to file grouped_requests = make_serializable(grouped_requests) - save_workload(grouped_requests, output_file, use_jsonl = to_jsonl) + with open(output_file, 'w') as file: + json.dump(grouped_requests, file) + print(grouped_requests) return grouped_requests if __name__ == '__main__': parser = argparse.ArgumentParser(description='Workload Generator') - parser.add_argument('--prompt-file', type=str, required=True, help='File containing prompts.') - parser.add_argument('--num-prompts', type=int, default=100, help='Number of prompts to sample.') - parser.add_argument('--group-interval-seconds', type=int, default=1, help='Grouping interval seconds.') - parser.add_argument('--trace-type', type=str, required=True, default="synthetic", help='Type of trace consumed. Choose among: synthetic, internal, azure') - parser.add_argument('--trace-file', type=str, required=False, default=None, help='File containing original trace file csv, which workload generator depends upon to convert to workload format. This is only needed for for internal/azure trace type. ') - parser.add_argument('--model', type=str, required=False, default="Qwen/Qwen2.5-Coder-7B-Instruct", help='Target model tokenizer.') - parser.add_argument('--output', type=str, required=False, default="output", help='Output path to the workload.') - parser.add_argument('--interval-ms', type=int, required=False, default=1000, help='Granularity of request injection interval in milliseconds.') - parser.add_argument('--duration-ms', type=int, default=60000, help='Duration of the trace generated.') - parser.add_argument('--to-jsonl', dest='to_jsonl', action='store_true', help='Set output data format to .jsonl (default .json).') + parser.add_argument('--prompt-file', type=str, required=True, help='File containing prompts') + parser.add_argument('--num-prompts', type=int, default=100, help='Number of prompts to sample') + parser.add_argument('--num-requests', type=int, default=10000, help='Number of requests in total') + parser.add_argument('--group-interval-seconds', type=int, default=1, help='Grouping interval seconds') + parser.add_argument('--trace-type', type=str, required=True, default="synthetic", help='Type of trace consumed') + parser.add_argument('--trace-file', type=str, required=False, default=None, help='File containing trace CSV') + parser.add_argument('--model', type=str, required=False, default="meta-llama/Llama-2-7b-hf", help='Target model tokenizer') + parser.add_argument('--output', type=str, required=False, default="output/output", help='Output path') args = parser.parse_args() + # Load prompts from a file + prompts = sample_sharegpt_requests(args.prompt_file, args.num_prompts) + + # Generate input requests (ascending integers)quit + demo_requests = list(range(1, 1 + args.num_requests)) + interval = 30 # Generate workloads and pair with prompts workload_dict = {} - tokenizer = get_tokenizer(pretrained_model_name_or_path = args.model, trust_remote_code = True) if args.trace_type == "synthetic": - # Load prompts from a file - prompts = sample_sharegpt_requests(dataset_path = args.prompt_file, num_requests = args.num_prompts, tokenizer = tokenizer) # Generate workloads with different parameters scenarios = { - 'Quick Rising': {'duration_ms': args.duration_ms, 'interval_ms': args.interval_ms, 'A': 5, 'period': 5, 'only_rise': True}, - 'Slow Rising': {'duration_ms': args.duration_ms, 'interval_ms': args.interval_ms, 'A': 5, 'period': 0.25, 'only_rise': True}, - 'Slight Fluctuation': {'duration_ms': args.duration_ms, 'interval_ms': args.interval_ms, 'A': 5, 'B': 5, 'period': 1, 'only_rise': False}, - 'Severe Fluctuation': {'duration_ms': args.duration_ms, 'interval_ms': args.interval_ms, 'A': 5, 'B': 10, 'period': 12, 'only_rise': False}, + 'Quick Rising': {'duration_sec': 600, 'interval_sec': interval, 'A': 5, 'period': 5, 'only_rise': True}, + 'Slow Rising': {'duration_sec': 600, 'interval_sec': interval, 'A': 5, 'period': 0.25, 'only_rise': True}, + 'Slight Fluctuation': {'duration_sec': 600, 'interval_sec': interval, 'A': 5, 'B': 5, 'period': 1, 'only_rise': False}, + 'Severe Fluctuation': {'duration_sec': 600, 'interval_sec': interval, 'A': 5, 'B': 10, 'period': 12, 'only_rise': False}, } for scenario_name, params in scenarios.items(): - generated_workload = generate_synthetic(**params) - paired_workload = pair_requests_with_prompts_round_robin(workload = generated_workload, - prompts = prompts, - output_file = f"{args.output}/{scenario_name}", - to_jsonl = args.to_jsonl) + generated_workload = generate_synthetic(demo_requests, **params) + paired_workload = pair_requests_with_prompts(generated_workload, prompts, f"{args.output}/{scenario_name}.json") workload_dict[scenario_name] = paired_workload # Plot the workloads - plot_workload(workload_dict, interval_ms=args.interval_ms, output_file=f"plot/synthetic.pdf") - elif args.trace_type == "internal": - # Load prompts from a file - prompts = sample_sharegpt_requests(dataset_path = args.prompt_file, num_requests = args.num_prompts, tokenizer = tokenizer) - # Generate input requests (ascending integers)quit - generated_workload = generate_from_internal_csv(file_path=args.trace_file, duration_ms = args.duration_ms, summary_interval_ms=15000, interval_ms=args.interval_ms) - generated_workload = pair_requests_with_prompts_round_robin(workload = generated_workload, - prompts = prompts, - output_file = f"{args.output}/internal", - to_jsonl = args.to_jsonl) - workload_dict["internal"] = generated_workload + plot_workload(workload_dict, interval_sec=interval, output_file=f"plot/synthetic.pdf") + elif args.trace_type == "summary": + generated_workload = generate_from_summary_csv(demo_requests, file_path=args.trace_file, sampling_granularity_seconds=15) + generated_workload = pair_requests_with_prompts(generated_workload, prompts, f"{args.output}/summary.json") + workload_dict["summary"] = generated_workload # Plot the workloads - plot_workload(workload_dict, interval_ms=args.interval_ms, output_file=f"plot/internal.pdf") + plot_workload(workload_dict, interval_sec=interval, output_file=f"plot/summary.pdf") elif args.trace_type == "azure": - generated_workload = generate_from_azure_csv(file_path=args.trace_file, - prompt_file_path = args.prompt_file, - duration_ms = args.duration_ms, - tokenizer = tokenizer, - interval_ms = args.interval_ms, - output_file = f"{args.output}/azure", - to_jsonl = args.to_jsonl) + tokenizer = get_tokenizer(pretrained_model_name_or_path = args.model, trust_remote_code = True) + generated_workload = generate_from_azure_csv(file_path=args.trace_file, prompt_file_path = args.prompt_file, tokenizer = tokenizer, group_interval_seconds=1, output_file=f"{args.output}/azure.json") workload_dict["azure"] = generated_workload # Plot the workloads - plot_workload(workload_dict, interval_ms=args.interval_ms, output_file=f"plot/azure.pdf") + plot_workload(workload_dict, interval_sec=interval, output_file=f"plot/azure.pdf")