Skip to content

Commit

Permalink
Add request length for traces (#569)
Browse files Browse the repository at this point in the history
* add request length for traces

clean up

bug fix: error bound relaxation

* add constant workload pattern

* add constant workload generator & clean up

---------

Co-authored-by: Le Xu <le.xu@bytedance.com>
  • Loading branch information
happyandslow and Le Xu authored Jan 21, 2025
1 parent 1024ffb commit 1417a45
Show file tree
Hide file tree
Showing 4 changed files with 175 additions and 58 deletions.
5 changes: 3 additions & 2 deletions benchmarks/generator/client.py
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ async def send_request(client, model, endpoint, prompt, output_file):
model=model,
messages=prompt,
temperature=0,
max_tokens=128
max_tokens=2048
)

latency = asyncio.get_event_loop().time() - start_time
Expand All @@ -27,6 +27,7 @@ async def send_request(client, model, endpoint, prompt, output_file):
output_text = response.choices[0].message.content

result = {
"input": prompt,
"output": output_text,
"prompt_tokens": prompt_tokens,
"output_tokens": output_tokens,
Expand All @@ -40,7 +41,7 @@ async def send_request(client, model, endpoint, prompt, output_file):
output_file.flush() # Ensure data is written immediately to the file

logging.warning(
f"Request completed in {latency:.2f} seconds with throughput {throughput:.2f} tokens/s, response {response}")
f"Request completed in {latency:.2f} seconds with throughput {throughput:.2f} tokens/s, request {prompt} response {response}")
return result
except Exception as e:
logging.error(f"Error sending request to at {endpoint}: {str(e)}")
Expand Down
71 changes: 61 additions & 10 deletions benchmarks/generator/sample_request.py
Original file line number Diff line number Diff line change
@@ -1,32 +1,59 @@
import logging
import json
import sys
import random

import pandas as pd

from typing import Tuple, Optional, List
from transformers import PreTrainedTokenizerBase


def load_requests(
dataset_path: str,
tokenizer: PreTrainedTokenizerBase,
) -> pd.DataFrame:
if "ShareGPT" in dataset_path:
return load_sharegpt_requests(dataset_path, tokenizer)
else:
return load_generated_dataset(dataset_path, tokenizer)

def load_sharegpt_requests(
dataset_path: str,
tokenizer: PreTrainedTokenizerBase,
) -> pd.DataFrame:
# Load the dataset into a DataFrame
logging.warn(f"...Start dataframe transformation")
with open(dataset_path, encoding='utf-8') as f:
dataset = json.load(f)
dataset = [
(data["conversations"][0]["value"], data["conversations"][1]["value"])
for data in dataset if len(data["conversations"]) >= 2
]
df = pd.DataFrame(dataset, columns=["prompt", "completion"])
logging.warn(f"...Start dataframe transformation")
# Tokenize and calculate lengths
df["prompt_len"] = df["prompt"].apply(lambda x: len(tokenizer(x).input_ids))
df["completion_len"] = df["completion"].apply(lambda x: len(tokenizer(x).input_ids))
logging.warn(f"...Complete dataframe transformation")
return df

def load_generated_dataset(
dataset_path: str,
tokenizer: PreTrainedTokenizerBase,
) -> pd.DataFrame:
# Load the dataset into a DataFrame
with open(dataset_path, encoding='utf-8') as f:
dataset = [json.loads(line) for line in f]
# Create a DataFrame with the desired columns
logging.warn(f"...Start dataframe transformation")
df = pd.DataFrame({
'prompt': [entry['input'][0]['content'] for entry in dataset],
'completion': [entry['output'] for entry in dataset],
'prompt_len': [entry['prompt_tokens'] for entry in dataset],
'completion_len': [entry['output_tokens'] for entry in dataset]
})
logging.warn(f"...Complete dataframe transformation")
return df

def sample_sharegpt_requests(
dataset_path: str,
Expand Down Expand Up @@ -66,7 +93,7 @@ def sample_sharegpt_requests(
return filtered_dataset


def sample_sharegpt_requests_len_range(
def sample_requests_len_range(
df: pd.DataFrame,
num_requests: int,
input_lens: List[int],
Expand All @@ -81,14 +108,18 @@ def sample_sharegpt_requests_len_range(
input_len = input_lens[i]
output_len = output_lens[i]
err_perc = initial_err_perc
while err_perc >= 0:

while err_perc < 1:
input_range = range(0, sys.maxsize)
output_range = range(0, sys.maxsize)
if input_len is not None:
input_range = (int(input_len * err_perc), int(input_len * (1 + err_perc)))
input_range = (int(input_len * (1 - err_perc)), int(input_len * (1 + err_perc)))
else:
input_range = (0, sys.maxsize)
if output_len is not None:
output_range = (int(output_len * err_perc), int(output_len * (1 + err_perc)))

output_range = (int(output_len * (1 - err_perc)), int(output_len * (1 + err_perc)))
else:
output_range = (0, sys.maxsize)
filtered = df[
(df["prompt_len"] >= input_range[0]) &
(df["prompt_len"] <= input_range[1]) &
Expand All @@ -98,17 +129,37 @@ def sample_sharegpt_requests_len_range(

if not filtered.empty:
# Select the first match or random sample
sample = filtered.iloc[0] # Or filtered.sample(1) for random
total_rows = len(filtered)
sample = filtered.iloc[random.randint(0, total_rows - 1)]
filtered_results.append({"prompt": sample["prompt"],
"prompt_length": sample["prompt_len"],
"output_length": sample["completion_len"]})
break # Stop relaxing for this request once a match is found

# Reduce err_perc for next iteration
logging.warn(f"Relax err_perc {err_perc} by {err_step}")
err_perc -= err_step
logging.debug(f"Relax err_perc {err_perc} by {err_step} new err_perc {err_perc + err_step} input_range {input_range} output_range {output_range}")
err_perc += err_step

if err_perc < 0:
if err_perc >= 1:
raise Exception(f"No match found for request {i + 1} even after relaxing err_perc to 0")

return filtered_results


def sample_requests_all(
df: pd.DataFrame,
start_idx: int,
qps: int
) -> List[Tuple[str, int, int, None]]:
results = []

# Relaxation mechanism
end_idx = min(start_idx + qps, len(df))
for i in range(start_idx, end_idx):
print(f"start_idx {start_idx} end_idx {end_idx} i {i} len {len(df)} ")
row = df.iloc[i]
results.append({"prompt": row["prompt"],
"prompt_length": row["prompt_len"],
"output_length": row["completion_len"]})

return results
52 changes: 49 additions & 3 deletions benchmarks/generator/utils.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,39 @@
import logging
import json
import os
import csv

import numpy as np
import matplotlib.pyplot as plt

from typing import List, Union, Any, Optional
from transformers import (AutoTokenizer, PreTrainedTokenizer,
PreTrainedTokenizerFast)
from datetime import datetime

def get_sample_interval_ms(file_path):
# Initialize variables
timestamps = []

# Read the file and extract the first two timestamps
with open(file_path, 'r') as file:
reader = csv.DictReader(file)
for row in reader:
if 'Time' in row and row['Time']:
# Parse the timestamp
timestamps.append(datetime.strptime(row['Time'], "%Y-%m-%d %H:%M:%S"))
# Stop after reading the first two timestamps
if len(timestamps) == 2:
break

# Calculate the interval in milliseconds
interval = None
if len(timestamps) == 2:
interval = int((timestamps[1] - timestamps[0]).total_seconds() * 1000)
logging.info(f"Sampling interval: {interval} milliseconds")
else:
logging.error("Insufficient data to calculate the sampling interval.")
return interval


def make_serializable(data):
Expand Down Expand Up @@ -43,7 +69,7 @@ def plot_workload(workload_dict, interval_ms, output_file: str = None):
"""
fig, ax = plt.subplots()
for workload_name, workload in workload_dict.items():
concurrency_values = [len(item) for (_, item) in workload]
concurrency_values = [len(item["requests"]) for item in workload]
ax.plot(np.arange(len(concurrency_values)) * interval_ms, concurrency_values, label=workload_name)

ax.set_ylim(0, )
Expand All @@ -55,8 +81,28 @@ def plot_workload(workload_dict, interval_ms, output_file: str = None):
plt.show()
else:
os.makedirs(os.path.dirname(output_file), exist_ok=True)
plt.savefig(output_file)
logging.info(f'Saved workload plot to {output_file}')
plt.savefig(f"{output_file}-traffic.pdf")
logging.info(f'Saved traffic plot to {output_file}-traffic.pdf')


fig, ax = plt.subplots()
for workload_name, workload in workload_dict.items():
input_lengths = [item["requests"][0]['prompt_length'] for item in workload]
output_lengths = [item["requests"][0]['output_length'] for item in workload]
ax.plot(np.arange(len(concurrency_values)) * interval_ms, input_lengths, label=f"{workload_name} prompt_length")
ax.plot(np.arange(len(concurrency_values)) * interval_ms, output_lengths, label=f"{workload_name} output_length")

ax.set_ylim(0, )
plt.xlabel('Time (ms)')
plt.ylabel('Lengths')
plt.title('Request Sizes')
plt.legend()
if output_file is None:
plt.show()
else:
os.makedirs(os.path.dirname(output_file), exist_ok=True)
plt.savefig(f"{output_file}-requests.pdf")
logging.info(f'Saved traffic plot to {output_file}-requests.pdf')


def save_workload(load_struct: List[Any],
Expand Down
Loading

0 comments on commit 1417a45

Please sign in to comment.