Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Frontend] Optionally remove memory buffer used for uploading to URLs in run_batch #12927

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 108 additions & 15 deletions vllm/entrypoints/openai/run_batch.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# SPDX-License-Identifier: Apache-2.0

import asyncio
import tempfile
from http import HTTPStatus
from io import StringIO
from typing import Awaitable, Callable, List, Optional
Expand Down Expand Up @@ -51,6 +52,13 @@ def parse_args():
help="The path or url to a single output file. Currently supports "
"local file paths, or web (http or https) urls. If a URL is specified,"
" the file should be available via HTTP PUT.")
parser.add_argument(
"--output-tmp-dir",
type=str,
default=None,
help="The directory to store the output file before uploading it "
"to the output URL.",
)
parser.add_argument("--response-role",
type=nullable_str,
default="assistant",
Expand Down Expand Up @@ -134,17 +142,107 @@ async def read_file(path_or_url: str) -> str:
return f.read()


async def write_file(path_or_url: str, data: str) -> None:
async def write_local_file(output_path: str,
batch_outputs: List[BatchRequestOutput]) -> None:
"""
Write the responses to a local file.
output_path: The path to write the responses to.
batch_outputs: The list of batch outputs to write.
"""
# We should make this async, but as long as run_batch runs as a
# standalone program, blocking the event loop won't effect performance.
with open(output_path, "w", encoding="utf-8") as f:
for o in batch_outputs:
print(o.model_dump_json(), file=f)


async def upload_data(output_url: str, data_or_file: str,
from_file: bool) -> None:
"""
Upload a local file to a URL.
output_url: The URL to upload the file to.
data_or_file: Either the data to upload or the path to the file to upload.
from_file: If True, data_or_file is the path to the file to upload.
"""
# Timeout is a common issue when uploading large files.
# We retry max_retries times before giving up.
max_retries = 5
# Number of seconds to wait before retrying.
delay = 5

for attempt in range(1, max_retries + 1):
try:
# We increase the timeout to 1000 seconds to allow
# for large files (default is 300).
async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(
total=1000)) as session:
if from_file:
with open(data_or_file, "rb") as file:
async with session.put(output_url,
data=file) as response:
if response.status != 200:
raise Exception(f"Failed to upload file.\n"
f"Status: {response.status}\n"
f"Response: {response.text()}")
else:
async with session.put(output_url,
data=data_or_file) as response:
if response.status != 200:
raise Exception(f"Failed to upload data.\n"
f"Status: {response.status}\n"
f"Response: {response.text()}")

except Exception as e:
if attempt < max_retries:
logger.error(
f"Failed to upload data (attempt {attempt}). "
f"Error message: {str(e)}.\nRetrying in {delay} seconds..."
)
await asyncio.sleep(delay)
else:
raise Exception(f"Failed to upload data (attempt {attempt}). "
f"Error message: {str(e)}.") from e


async def write_file(path_or_url: str, batch_outputs: List[BatchRequestOutput],
output_tmp_dir: str) -> None:
"""
Write batch_outputs to a file or upload to a URL.
path_or_url: The path or URL to write batch_outputs to.
batch_outputs: The list of batch outputs to write.
output_tmp_dir: The directory to store the output file before uploading it
to the output URL.
"""
if path_or_url.startswith("http://") or path_or_url.startswith("https://"):
async with aiohttp.ClientSession() as session, \
session.put(path_or_url, data=data.encode("utf-8")):
pass
if output_tmp_dir is None:
logger.info("Writing outputs to memory buffer")
output_buffer = StringIO()
for o in batch_outputs:
print(o.model_dump_json(), file=output_buffer)
output_buffer.seek(0)
logger.info("Uploading outputs to %s", path_or_url)
await upload_data(
path_or_url,
output_buffer.read().strip().encode("utf-8"),
from_file=False,
)
else:
# Write responses to a temporary file and then upload it to the URL.
with tempfile.NamedTemporaryFile(
mode="w",
encoding="utf-8",
dir=output_tmp_dir,
prefix="tmp_batch_output_",
suffix=".jsonl",
) as f:
logger.info("Writing outputs to temporary local file %s",
f.name)
await write_local_file(f.name, batch_outputs)
logger.info("Uploading outputs to %s", path_or_url)
await upload_data(path_or_url, f.name, from_file=True)
else:
# We should make this async, but as long as this is always run as a
# standalone program, blocking the event loop won't effect performance
# in this particular case.
with open(path_or_url, "w", encoding="utf-8") as f:
f.write(data)
logger.info("Writing outputs to local file %s", path_or_url)
await write_local_file(path_or_url, batch_outputs)


def make_error_request_output(request: BatchRequestInput,
Expand Down Expand Up @@ -317,12 +415,7 @@ async def main(args):
with tracker.pbar():
responses = await asyncio.gather(*response_futures)

output_buffer = StringIO()
for response in responses:
print(response.model_dump_json(), file=output_buffer)

output_buffer.seek(0)
await write_file(args.output_file, output_buffer.read().strip())
await write_file(args.output_file, responses, args.output_tmp_dir)


if __name__ == "__main__":
Expand Down