Skip to content

Commit

Permalink
Adding Client for Workload Generator Workload File (#501)
Browse files Browse the repository at this point in the history
revert changes in 02b0ac5

revert changes in 02b0ac5

update README

Co-authored-by: Le Xu <le.xu@bytedance.com>
  • Loading branch information
happyandslow and Le Xu authored Dec 6, 2024
1 parent 1eaa7c8 commit 5482dd3
Show file tree
Hide file tree
Showing 5 changed files with 150 additions and 15 deletions.
32 changes: 29 additions & 3 deletions benchmarks/generator/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Generate workload file
# Using Workload Generator

## Generate a workload file based on workload patterns (synthetic patterns)
## Generate workload file
### Generate a workload file based on workload patterns (synthetic patterns)
If no trace file path is specified, the generator will generate workload file based on 4 synthetic pattern described [here](https://github.com/aibrix/aibrix/blob/main/benchmarks/autoscaling/bench_workload_generator.py):

```
Expand Down Expand Up @@ -47,4 +48,29 @@ export SHARE_GPT_PATH=${PATH_TO_SHARE_GPT_FILE}
python workload_generator.py --prompt-file $SHARE_GPT_PATH --num-prompts 100 --interval-ms 1000 --duration-ms 3600000 --trace-type azure --trace-file "$AZURE_TRACE_NAME" --group-interval-seconds 1 --model "Qwen/Qwen2.5-Coder-7B-Instruct" --output "output"
```

Note that the trace file contains both input and output lengths. And therefore dataset in ```$SHARE_GPT_PATH``` needs to be tokenized to be able to sampled based on their input/output token lengths. Therefore it is required to specify tokenizer to generate based on this trace. Use ```--group-interval-seconds``` to specify grouping interval from the origianl trace. The file would be stored under ```output``` folder and the plot illustrates the workload pattern will be under the ```plot``` directory.
Note that the trace file contains both input and output lengths. And therefore dataset in ```$SHARE_GPT_PATH``` needs to be tokenized to be able to sampled based on their input/output token lengths. Therefore it is required to specify tokenizer to generate based on this trace. Use ```--group-interval-seconds``` to specify grouping interval from the origianl trace. The file would be stored under ```output``` folder and the plot illustrates the workload pattern will be under the ```plot``` directory.


## Run Workload Generator

Starting vllm server:

```
python3 -m vllm.entrypoints.openai.api_server --host 0.0.0.0 \
--port "8000" --model /root/models/deepseek-coder-6.7b-instruct \
--trust-remote-code --max-model-len "14304" \
--api-key sk-kFJ12nKsFVfVmGpj3QzX65s4RbN2xJqWzPYCjYu7wT3BlbLi \
--enable-chunked-prefill
```

Using a sample workload in a client:
```
python3 client.py --workload-path "output/Quick Rising.jsonl" \
--endpoint "http://localhost:8000" \
--model /root/models/deepseek-coder-6.7b-instruct \
--api-key sk-kFJ12nKsFVfVmGpj3QzX65s4RbN2xJqWzPYCjYu7wT3BlbLi \
--output-file-path output.jsonl
```

The output will be stored as a ```.jsonl``` file in ```output.jsonl```
94 changes: 94 additions & 0 deletions benchmarks/generator/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
import argparse
import logging
import time
import asyncio
import openai
import json

from utils import (load_workload, wrap_prompt_as_chat_message)

# Asynchronous request handler
async def send_request(client, model, endpoint, prompt, output_file):
start_time = asyncio.get_event_loop().time()
try:
response = await client.chat.completions.create(
model = model,
messages = prompt,
temperature = 0,
max_tokens = 128
)

latency = asyncio.get_event_loop().time() - start_time
prompt_tokens = response.usage.prompt_tokens
output_tokens = response.usage.completion_tokens
total_tokens = response.usage.total_tokens
throughput = output_tokens / latency
output_text = response.choices[0].message.content

result = {
"output": output_text,
"prompt_tokens": prompt_tokens,
"output_tokens": output_tokens,
"total_tokens": total_tokens,
"latency": latency,
"throughput": throughput
}

# Write result to JSONL file
output_file.write(json.dumps(result) + "\n")
output_file.flush() # Ensure data is written immediately to the file

logging.warning(
f"Request completed in {latency:.2f} seconds with throughput {throughput:.2f} tokens/s, response {response}")
return result
except Exception as e:
logging.error(f"Error sending request to at {endpoint}: {str(e)}")
return None

async def benchmark(endpoint, model, api_key, workload_path, output_file_path):
client = openai.AsyncOpenAI(
api_key=api_key,
base_url=endpoint+"/v1",
)
with open(output_file_path, 'a', encoding='utf-8') as output_file:
load_struct = load_workload(workload_path)
batch_tasks = []
base_time = time.time()
num_requests = 0
for requests_dict in load_struct:
ts = int(requests_dict["Timestamp"])
requests = requests_dict["Requests"]
cur_time = time.time()
target_time = base_time + ts/1000.0
logging.warning(f"Prepare to launch {len(requests)} tasks after {target_time - cur_time}")
if target_time > cur_time:
await asyncio.sleep(target_time - cur_time)
formatted_prompts = [wrap_prompt_as_chat_message(request["Prompt"]) for request in requests]
for formatted_prompt in formatted_prompts:
task = asyncio.create_task(
send_request(client, model, endpoint, formatted_prompt, output_file)
)
batch_tasks.append(task)
num_requests += len(requests)
await asyncio.gather(*batch_tasks)
logging.warning(f"All {num_requests} requests completed for deployment.")


def main(args):
logging.info(f"Starting benchmark on endpoint {args.endpoint}")
start_time = time.time()
asyncio.run(benchmark(args.endpoint, args.model, args.api_key, args.workload_path, args.output_file_path))
end_time = time.time()
logging.info(f"Benchmark completed in {end_time - start_time:.2f} seconds")


if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Workload Generator')
parser.add_argument("--workload-path", type=str, default=None, help="File path to the workload file.")
parser.add_argument('--endpoint', type=str, required=True)
parser.add_argument("--model", type=str, required=True, help="Name of the model.")
parser.add_argument("--api-key", type=str, required=True, help="API key to the service. ")
parser.add_argument('--output-file-path', type=str, default="output.jsonl")

args = parser.parse_args()
main(args)
12 changes: 9 additions & 3 deletions benchmarks/generator/sample_request.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,13 @@ def sample_sharegpt_requests(
continue
if prompt_len > 1024 or prompt_len + output_len > 2048:
continue
filtered_dataset.append((prompt, prompt_len, output_len, None))
filtered_dataset.append({"Prompt": prompt,
"Prompt Length": prompt_len,
"Output Length": output_len})
else:
filtered_dataset.append((prompt, -1, -1, None))
filtered_dataset.append({"Prompt": prompt,
"Prompt Length": -1,
"Output Length": -1})

return filtered_dataset

Expand Down Expand Up @@ -89,7 +93,9 @@ def sample_sharegpt_requests_len_range(
if not filtered.empty:
# Select the first match or random sample
sample = filtered.iloc[0] # Or filtered.sample(1) for random
filtered_results.append((sample["prompt"], sample["prompt_len"], sample["completion_len"], None))
filtered_results.append({"Prompt": sample["prompt"],
"Prompt Length": sample["prompt_len"],
"Output Length": sample["completion_len"]})
break # Stop relaxing for this request once a match is found

# Reduce err_perc for next iteration
Expand Down
23 changes: 16 additions & 7 deletions benchmarks/generator/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,20 +63,29 @@ def save_workload(load_struct: List[Any],
for row in load_struct:
json_line = json.dumps(row) # Convert list to JSON string
file.write(json_line + "\n")
logging.warn(f'Saved workload file to {output_path + ".jsonl"}')
logging.warn(f'Saved workload file to {output_path + ".jsonl"}')
else:
with open(output_path + ".json", 'w') as file:
json.dump(load_struct, file, indent=4)
logging.warn(f'Saved workload file to {output_path + ".json"}')
logging.warn(f'Saved workload file to {output_path + ".json"}')

def load_workload(load_struct: List[Any],
input_path: str,
use_jsonl: Optional[bool] = False) -> List[Any]:
def load_workload(input_path: str) -> List[Any]:
load_struct = None
if use_jsonl:
if input_path.endswith(".jsonl"):
with open(input_path, "r") as file:
load_struct = [json.loads(line) for line in file]
else:
with open(input_path, "r") as file:
load_struct = json.load(file)
return load_struct
return load_struct

# Function to wrap the prompt into OpenAI's chat completion message format.
def wrap_prompt_as_chat_message(prompt: str):
"""
Wrap the prompt into OpenAI's chat completion message format.
:param prompt: The user prompt to be converted.
:return: A list containing chat completion messages.
"""
user_message = {"role": "user", "content": prompt}
return [user_message]
4 changes: 2 additions & 2 deletions benchmarks/generator/workload_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ def pair_requests_with_prompts_round_robin(workload: List[List[Any]],
requests_with_prompts = [
prompts[request % prompt_count] for request in requests
]
paired_workload.append((ts, requests_with_prompts))
paired_workload.append({"Timestamp": ts, "Requests": requests_with_prompts})

# Save to file
save_workload(paired_workload, output_file, use_jsonl = to_jsonl)
Expand Down Expand Up @@ -185,7 +185,7 @@ def generate_from_azure_csv(file_path: str,
)

if sampled_requests: # Only add non-empty groups
grouped_requests.append((ts, sampled_requests))
grouped_requests.append({"Timestamp": ts, "Requests": sampled_requests})
ts += interval_ms
if ts > duration_ms:
break
Expand Down

0 comments on commit 5482dd3

Please sign in to comment.