Skip to content

Commit

Permalink
Benchmark Generator Refactoring (#655)
Browse files Browse the repository at this point in the history
* adding smoothed requests pattern

* refactor synthetic generation to generate synthetic request size patterns

* update README

* clean up

* patterns specific tag for synthetic load

* add constant workload

* add manually configured pattern for synthetic workload

* clean up

* update

* clean up

---------

Co-authored-by: Le Xu <le.xu@bytedance.com>
Signed-off-by: Varun Gupta <varungup90@gmail.com>
  • Loading branch information
2 people authored and varungup90 committed Feb 20, 2025
1 parent 8d9dae1 commit 1b36d59
Show file tree
Hide file tree
Showing 7 changed files with 493 additions and 187 deletions.
58 changes: 53 additions & 5 deletions benchmarks/generator/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,28 +9,76 @@ wget https://huggingface.co/datasets/anon8231489123/ShareGPT_Vicuna_unfiltered/r
export SHAREGPT_FILE_PATH=/tmp/ShareGPT_V3_unfiltered_cleaned_split.json
```

### Generate a workload file based with constant target QPS (synthetic patterns)

```shell
export TARGET_QPS=1

python workload_generator.py --prompt-file $SHAREGPT_FILE_PATH --interval-ms 1000 --duration-ms 300000 --target-qps $ta --trace-type constant --model "Qwen/Qwen2.5-Coder-7B-Instruct" --output-dir "output" --output-format jsonl
```

### Generate a workload file based on workload patterns (synthetic patterns)

If no trace file path is specified, the generator will generate workload file based on 4 synthetic pattern described [here](https://github.com/aibrix/aibrix/blob/main/benchmarks/autoscaling/bench_workload_generator.py):
The can generate workload file based on synthetic traffic (qps), input lengths (prompt lengths) and output lengths (completion lengths) patterns. Currently we support 4 patterns (`'quick_rising`, `'slow_rising'`, `'slight_fluctuation'`, `'severe_fluctuation'`), described [here](https://github.com/aibrix/aibrix/blob/main/benchmarks/autoscaling/bench_workload_generator.py).:
```shell
python workload_generator.py --prompt-file $SHAREGPT_FILE_PATH --interval-ms 1000 --duration-ms 300000 --trace-type synthetic --traffic-pattern "slight_fluctuation" --prompt-len-pattern "slight_fluctuation" --completion-len-pattern "slight_fluctuation" --model "Qwen/Qwen2.5-Coder-7B-Instruct" --output-dir "./output" --output-format jsonl
```

Alternatively, you could specify fluctuation patterns in .json file and pass to the generator like the following. Example configuration files are under `config` directory.
```shell
python workload_generator.py --prompt-file $SHAREGPT_FILE_PATH --num-prompts 100 --interval-ms 1000 --duration-ms 600000 --trace-type synthetic --model "Qwen/Qwen2.5-Coder-7B-Instruct" --output-dir "output"
python workload_generator.py --prompt-file $SHAREGPT_FILE_PATH --interval-ms 1000 --duration-ms 1400000 --trace-type synthetic --traffic-pattern-config config/traffic-config.json --prompt-len-pattern-config config/prompt-len-config.json --completion-len-pattern-config config/completion-len-config.json --model "Qwen/Qwen2.5-Coder-7B-Instruct" --output-dir "./output" --output-format jsonl
```


Here `--interval-ms` specifies the granularity of concurrent dispatched requests (in milliseconds). `--duration-ms` specifies the total length of the trace in milliseconds.

The file would be stored under `output` folder based on the name of different patterns. And the plot illustrates the workload pattern will be under the `plot` directory.

## Generate a workload file based on internal load summary .csv file

```shell
export SUMMARY_FILE=${PATH_TO_SUMMARY_FILE}
python workload_generator.py --prompt-file $SHAREGPT_FILE_PATH --num-prompts 100 --interval-ms 1000 --duration-ms 600000 --trace-type internal --traffic-file "$SUMMARY_FILE" --model "Qwen/Qwen2.5-Coder-7B-Instruct" --output-dir "output"
export TRAFFIC_FILE=${PATH_TO_TRAFFIC_FILE}
export PROMPT_LEN_FILE=${PATH_TO_PROMPT_LEN_FILE}
export COMPLETION_LEN_FILE=${PATH_TO_COMPLETION_LEN_FILE}

python workload_generator.py --prompt-file $SHAREGPT_FILE_PATH --interval-ms 1000 --duration-ms 1800000 --trace-type internal --traffic-file "$TRAFFIC_FILE" --prompt-len-file "$PROMPT_LEN_FILE" --completion-len-file "$COMPLETION_LEN_FILE" --model "Qwen/Qwen2.5-Coder-7B-Instruct" --output-dir "./output" --output-format jsonl --qps-scale 1.0 --output-scale 1.0 --input-scale 1.0 --internal-trace-type "maas"
```

This generator assumes trace file to be in the following format
The scaling factor here (e.g., `qps-scale`) scale down rate from the original trace to the desired rate, i.e., if the peak rate in the original file is 80 and the desired peak rate is 8, the scale is set to 10.0.

### `maas` trace type
- With `maas` trace type, the generator assumes the `$TRAFFIC_FILE` to be in the following format
```
"Time","Total","Success","4xx Error"
2024-10-1 00:00:00,100,99,1
```

- `"$PROMPT_LEN_FILE"` to be in the following format
```
"Time","P50","P70","P90","P99"
```

- `"$PROMPT_LEN_FILE"` to be in the following format
```
"Time","P50","P70","P95","P99"
```

### `cloudide` trace type
- With `cloudide` trace type, the generator assumes the `$TRAFFIC_FILE` to be in the following format -- `"Rate"` column could have arbitrary names.
```
"Time","Rate"
```

- `"$PROMPT_LEN_FILE"` to be in the following format
```
"Time","recv_bytes","sent_bytes"
```

- `"$PROMPT_LEN_FILE"` to be in the following format
```
"Time","recv_bytes","sent_bytes"
```

### Indicate the length of prompt/completion
In this case, you can also indicate the request's prompt length by the `--prompt-len-file` config, or the output length by the `--completion-len-file`,
based on the parameters, the generator will select the proper length in the prompt_file to simulate the length of the real flow's load.
Expand Down
8 changes: 8 additions & 0 deletions benchmarks/generator/config/completion-len-config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"A": 8,
"B": 169,
"sigma": 0.1,
"period": 0.005,
"omega": null,
"only_rise": false
}
8 changes: 8 additions & 0 deletions benchmarks/generator/config/prompt-len-config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"A": 15,
"B": 309,
"sigma": 0.1,
"period": 0.005,
"omega": null,
"only_rise": false
}
8 changes: 8 additions & 0 deletions benchmarks/generator/config/traffic-config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"A": 2,
"B": 6,
"sigma": 0.1,
"period": 1,
"omega": null,
"only_rise": false
}
10 changes: 7 additions & 3 deletions benchmarks/generator/sample_request.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ def sample_requests_len_range(
output_len = output_lens[i]
err_perc = initial_err_perc

while err_perc < 1:
while err_perc <= 1:
input_range = range(0, sys.maxsize)
output_range = range(0, sys.maxsize)
if input_len is not None:
Expand All @@ -126,7 +126,6 @@ def sample_requests_len_range(
(df["completion_len"] >= output_range[0]) &
(df["completion_len"] <= output_range[1])
]

if not filtered.empty:
# Select the first match or random sample
total_rows = len(filtered)
Expand All @@ -141,7 +140,12 @@ def sample_requests_len_range(
err_perc += err_step

if err_perc >= 1:
raise Exception(f"No match found for request {i + 1} even after relaxing err_perc to 0")
logging.warn(f"No match found for request {i + 1} even after relaxing err_perc to {err_perc} fallback to random")
total_rows = len(df)
sample = df.iloc[random.randint(0, total_rows - 1)]
filtered_results.append({"prompt": sample["prompt"],
"prompt_length": sample["prompt_len"],
"output_length": sample["completion_len"]})

return filtered_results

Expand Down
209 changes: 170 additions & 39 deletions benchmarks/generator/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,120 @@

import numpy as np
import matplotlib.pyplot as plt
import pandas as pd

from typing import List, Union, Any, Optional
from typing import List, Union, Any, Optional, Tuple, Dict
from transformers import (AutoTokenizer, PreTrainedTokenizer,
PreTrainedTokenizerFast)
from datetime import datetime

def convert_to_stat_df(qps_file: str,
input_file: str,
output_file: str,
internal_trace_type: str) -> pd.DataFrame:
if internal_trace_type == "maas":
# Load CSV files into DataFrames
qps_df = pd.read_csv(qps_file)
input_len_df = pd.read_csv(input_file)
output_len_df = pd.read_csv(output_file)

# Rename columns for merging and clarity
input_len_df.rename(columns={"P50": "input_len_p50", "P70": "input_len_p70", "P90": "input_len_p90", "P99": "input_len_p99"}, inplace=True)
output_len_df.rename(columns={"P50": "output_len_p50", "P70": "output_len_p70", "P95": "output_len_p90", "P99": "output_len_p99"}, inplace=True)
qps_df.rename(columns={"Success": "qps_success"}, inplace=True)

# Merge DataFrames on the 'Time' column (now renamed to 'timestamp')
merged_df = pd.merge(input_len_df, output_len_df, on="Time")
merged_df = pd.merge(merged_df, qps_df, on="Time")

# Drop unwanted columns (if needed)
merged_df.drop(columns=["Total", "5xx Error", "4xx Error"], inplace=True)

# Rename the 'Time' column to 'timestamp'
merged_df.rename(columns={"Time": "timestamp"}, inplace=True)

# Rearrange columns to match the desired order
merged_df = merged_df[[
"timestamp",
"input_len_p50", "input_len_p70", "input_len_p90", "input_len_p99",
"output_len_p50", "output_len_p70", "output_len_p90", "output_len_p99",
"qps_success"
]]
merged_df['timestamp'] = pd.to_datetime(merged_df['timestamp'])
elif internal_trace_type == "cloudide":
if input_file != output_file:
logging.error(f"input file {input_file} does not match output_file {output_file}")
df = pd.read_csv(input_file, parse_dates=['Time'])
df = df.replace("undefined", 0)
df['Time'] = pd.to_datetime(df['Time'], unit = 'ms') # Ensure timestamp is a datetime object
df = df.set_index('Time') # Set 'Time' as index for rolling window calculation
df_rate = pd.read_csv(qps_file, parse_dates=['Time'])
df_rate.columns.values[1] = "Rate"
df_rate = df_rate.replace("undefined", 0)
df_rate['Time'] = pd.to_datetime(df_rate['Time'], unit = 'ms')
df_rate = df_rate.set_index('Time')

sent_columns = df.filter(regex = r'^sent_bytes.rate@')
sent_columns = sent_columns.apply(pd.to_numeric, errors='coerce').fillna(0)
df['sent'] = sent_columns.sum(axis = 1)

recv_columns = df.filter(regex = r'^recv_bytes.rate@')
recv_columns = recv_columns.apply(pd.to_numeric, errors='coerce').fillna(0)
df['recv'] = recv_columns.sum(axis = 1)

df_merged = pd.merge(df, df_rate, left_index=True, right_index=True, how='outer')
df_merged = df_merged.fillna(0)
df_merged = df_merged.apply(pd.to_numeric, errors='coerce').fillna(0)

df_merged['sent_rate'] = df_merged.apply(lambda row : 0 if row['Rate'] == 0 else row['sent'] / row['Rate'], axis=1)
df_merged['recv_rate'] = df_merged.apply(lambda row : 0 if row['Rate'] == 0 else row['recv'] / row['Rate'], axis=1)

df_merged = df_merged.reset_index()
merged_df = pd.DataFrame({
"timestamp": df_merged['Time'],
"input_len_p50": df_merged['recv_rate'],
"input_len_p70": df_merged['recv_rate'],
"input_len_p90": df_merged['recv_rate'],
"input_len_p99": df_merged['recv_rate'],
"output_len_p50": df_merged['sent_rate'],
"output_len_p70": df_merged['sent_rate'],
"output_len_p90": df_merged['sent_rate'],
"output_len_p99": df_merged['sent_rate'],
"qps_success":df_merged['Rate'],
})
return merged_df

def read_distribution_stats(df: pd.DataFrame) -> Tuple[List[Dict], List[Dict], List[Dict]]:
time_diffs = df['timestamp'].diff().dt.total_seconds()
section_in_seconds = int(time_diffs.mean()) # Use average time difference
input_len_configs = []
output_len_configs = []
rps_configs = []
for _, row in df.iterrows():
input_len_configs.append({
"p50": float(row['input_len_p50']),
"p70": float(row['input_len_p70']),
"p90": float(row['input_len_p90']),
"p99": float(row['input_len_p99']),
"period": section_in_seconds,
"total_seconds": section_in_seconds
})
output_len_configs.append({
"p50": float(row['output_len_p50']),
"p70": float(row['output_len_p70']),
"p90": float(row['output_len_p90']),
"p99": float(row['output_len_p99']),
"period": section_in_seconds,
"total_seconds": section_in_seconds
})
rps_configs.append({
"mean_rps": float(row['qps_success']),
"amplitude": float(row['qps_success']) * 0.2, # 20% variation
"period": section_in_seconds,
"total_seconds": section_in_seconds
})
return input_len_configs, output_len_configs, rps_configs

def get_sample_interval_ms(file_path):
# Initialize variables
timestamps = []
Expand Down Expand Up @@ -59,50 +167,69 @@ def get_tokenizer(
trust_remote_code=trust_remote_code)


def plot_workload(workload_dict, interval_ms, output_file: str = None):
def plot_workload(workload_name: str,
workload: str,
bin_size_sec: int = 1,
output_dir: str = None):
"""
Plots the concurrency (item length) of the generated workload.
Plots workload statistics: total requests, prompt token count, and output token count binned by time.
Args:
workload_dict (dict): A dictionary where the keys are workload names (labels) and the values are lists of lists representing the workload.
interval_ms (int): Interval in milliseconds.
workload_name (str): Name of the workload.
workload (list of dict): Workload entries with timestamps and request details.
bin_size_sec (int): Size of each bin in seconds for aggregation.
output_file (str, optional): File path to save the plot.
"""
fig, ax = plt.subplots()
for workload_name, workload in workload_dict.items():
concurrency_values = [len(item["requests"]) for item in workload]
ax.plot(np.arange(len(concurrency_values)) * interval_ms, concurrency_values, label=workload_name)

ax.set_ylim(0, )
plt.xlabel('Time (ms)')
plt.ylabel('Concurrency')
plt.title('Workload Concurrency')
plt.legend()
if output_file is None:
plt.show()
print(f"plot_workload in directory {output_dir}")
# Convert workload data to a DataFrame
data = []
for entry in workload:
timestamp_sec = entry["timestamp"] / 1000 # Convert ms to sec
num_requests = len(entry["requests"])
total_prompt_tokens = np.mean([req["prompt_length"] for req in entry["requests"]]) if entry["requests"] else 0
total_output_tokens = np.mean([req["output_length"] for req in entry["requests"]]) if entry["requests"] else 0
data.append((timestamp_sec, num_requests, total_prompt_tokens, total_output_tokens))

df = pd.DataFrame(data, columns=["timestamp", "num_requests", "total_prompt_tokens", "total_output_tokens"])

# Define bins based on min/max timestamp
min_time, max_time = df["timestamp"].min(), df["timestamp"].max()
bins = np.arange(min_time, max_time + bin_size_sec, bin_size_sec)

# Bin the data
df["time_bin"] = pd.cut(df["timestamp"], bins, labels=bins[:-1])

# Aggregate within each bin
binned_df = df.groupby("time_bin").sum()

# Convert index back to numeric
binned_df.index = binned_df.index.astype(float)

# Plotting
fig, (ax_qps, ax_input, ax_output) = plt.subplots(3, 1, figsize=(15, 12))

ax_qps.plot(binned_df.index, binned_df["num_requests"], label="Total Requests")
ax_input.plot(binned_df.index, binned_df["total_prompt_tokens"], label="Total Prompt Tokens")
ax_output.plot(binned_df.index, binned_df["total_output_tokens"], label="Total Output Tokens")

# Formatting plots
for ax, ylabel, title in zip([ax_qps, ax_input, ax_output],
["Requests per Second", "Prompt Token Count", "Output Token Count"],
["Total Requests Sent per Second", "Total Prompt Tokens per Second", "Total Output Tokens per Second"]):
ax.set_xlabel("Time (seconds)")
ax.set_ylabel(ylabel)
ax.set_title(title)
ax.legend()

plt.tight_layout()

# Save or show the plot
if output_dir:
os.makedirs(os.path.dirname(output_dir), exist_ok=True)
plt.savefig(f"{output_dir}/{workload_name}.pdf")
logging.info(f'Saved workload plot to {output_dir}/{workload_name}.pdf')
else:
os.makedirs(os.path.dirname(output_file), exist_ok=True)
plt.savefig(f"{output_file}-traffic.pdf")
logging.info(f'Saved traffic plot to {output_file}-traffic.pdf')


fig, ax = plt.subplots()
for workload_name, workload in workload_dict.items():
input_lengths = [item["requests"][0]['prompt_length'] for item in workload]
output_lengths = [item["requests"][0]['output_length'] for item in workload]
ax.plot(np.arange(len(concurrency_values)) * interval_ms, input_lengths, label=f"{workload_name} prompt_length")
ax.plot(np.arange(len(concurrency_values)) * interval_ms, output_lengths, label=f"{workload_name} output_length")

ax.set_ylim(0, )
plt.xlabel('Time (ms)')
plt.ylabel('Lengths')
plt.title('Request Sizes')
plt.legend()
if output_file is None:
plt.show()
else:
os.makedirs(os.path.dirname(output_file), exist_ok=True)
plt.savefig(f"{output_file}-requests.pdf")
logging.info(f'Saved traffic plot to {output_file}-requests.pdf')


def save_workload(load_struct: List[Any],
Expand Down Expand Up @@ -133,6 +260,10 @@ def load_workload(input_path: str) -> List[Any]:
load_struct = json.load(file)
return load_struct

def load_config(config_path: str) -> Dict[str, Any]:
with open(config_path, "r") as file:
config = json.load(file)
return config

# Function to wrap the prompt into OpenAI's chat completion message format.
def wrap_prompt_as_chat_message(prompt: str):
Expand Down
Loading

0 comments on commit 1b36d59

Please sign in to comment.