Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bugfix][Frontend] Fix Issues Under High Load With zeromq Frontend #7394

Merged
merged 88 commits into from
Aug 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
88 commits
Select commit Hold shift + click to select a range
b2e29a5
added proxy to limit use of uniz sockets
robertgshaw2-redhat Aug 10, 2024
8d31115
Merge branch 'main' into fix-zmq-max-sockets
robertgshaw2-redhat Aug 10, 2024
6d2b3df
comment
robertgshaw2-redhat Aug 10, 2024
c73e943
use random inproc path
robertgshaw2-redhat Aug 10, 2024
f1768fb
format
robertgshaw2-redhat Aug 10, 2024
601a461
foamt
robertgshaw2-redhat Aug 10, 2024
1a47d94
format
robertgshaw2-redhat Aug 10, 2024
eeecb09
Update vllm/entrypoints/openai/rpc/client.py
robertgshaw2-redhat Aug 10, 2024
2770e40
cleaning
robertgshaw2-redhat Aug 14, 2024
5a85618
Merge branch 'main' into fix-zmq-max-sockets
robertgshaw2-redhat Aug 18, 2024
938db1d
Merge branch 'fix-zmq-max-sockets' of https://github.com/neuralmagic/…
robertgshaw2-redhat Aug 18, 2024
ea2f03e
remove logging
robertgshaw2-redhat Aug 18, 2024
5cebc65
add info message re: concurrency
robertgshaw2-redhat Aug 18, 2024
2c12436
update comment
robertgshaw2-redhat Aug 18, 2024
9afd6ba
update
robertgshaw2-redhat Aug 18, 2024
c262088
format
robertgshaw2-redhat Aug 18, 2024
3e580d5
reorder
robertgshaw2-redhat Aug 18, 2024
d9e10e0
reverT
robertgshaw2-redhat Aug 18, 2024
4e3a63a
fix
robertgshaw2-redhat Aug 18, 2024
e54bf8a
fix
robertgshaw2-redhat Aug 18, 2024
6544f3a
fix abort logic
robertgshaw2-redhat Aug 18, 2024
81f4da8
reduce LOC change
robertgshaw2-redhat Aug 18, 2024
b3374bc
cleanup
robertgshaw2-redhat Aug 18, 2024
dd1817a
cleanup
robertgshaw2-redhat Aug 18, 2024
5b56365
format
robertgshaw2-redhat Aug 18, 2024
05ff816
fix client
robertgshaw2-redhat Aug 18, 2024
e551d30
revert unneccessary change
robertgshaw2-redhat Aug 18, 2024
3d7f65f
revert startup probe changes to separate PR
robertgshaw2-redhat Aug 18, 2024
e7e6f1e
stash
robertgshaw2-redhat Aug 18, 2024
eaaebcc
Merge branch 'main' into fix-zmq-max-sockets
robertgshaw2-redhat Aug 18, 2024
21b5239
stash draining
robertgshaw2-redhat Aug 19, 2024
7e15b00
update
robertgshaw2-redhat Aug 19, 2024
74c4166
stash
robertgshaw2-redhat Aug 19, 2024
450e949
convert RPCServer to use DEALER
robertgshaw2-redhat Aug 19, 2024
8348f1f
stash
robertgshaw2-redhat Aug 19, 2024
545956e
fix
robertgshaw2-redhat Aug 19, 2024
7a34611
cleaning
robertgshaw2-redhat Aug 19, 2024
50abb94
stash
robertgshaw2-redhat Aug 19, 2024
1723687
remove awk
robertgshaw2-redhat Aug 19, 2024
3dfc9ef
nits
robertgshaw2-redhat Aug 20, 2024
8d40f2d
format
robertgshaw2-redhat Aug 20, 2024
3397460
format
robertgshaw2-redhat Aug 20, 2024
ef132dc
nit
robertgshaw2-redhat Aug 20, 2024
10ef204
change
robertgshaw2-redhat Aug 20, 2024
b67718f
clean
robertgshaw2-redhat Aug 20, 2024
c3c1dbe
Update vllm/entrypoints/openai/rpc/server.py
robertgshaw2-redhat Aug 20, 2024
ee6efcf
format
robertgshaw2-redhat Aug 20, 2024
3fdc2fe
cleanup abort logic
robertgshaw2-redhat Aug 20, 2024
4cacb56
nit
robertgshaw2-redhat Aug 20, 2024
724eb31
added load test
robertgshaw2-redhat Aug 21, 2024
4d5e6b7
update load test
robertgshaw2-redhat Aug 21, 2024
b9e4168
updated
robertgshaw2-redhat Aug 21, 2024
8f9bc23
format
robertgshaw2-redhat Aug 21, 2024
9a2be3f
updated
robertgshaw2-redhat Aug 21, 2024
dee38f0
revert suurious change
robertgshaw2-redhat Aug 21, 2024
e78f443
convert to even smaller model
robertgshaw2-redhat Aug 21, 2024
cc2d7db
20k requests
robertgshaw2-redhat Aug 21, 2024
b40e269
convert to 10k requests
robertgshaw2-redhat Aug 21, 2024
03eed9c
clean up closing logic
robertgshaw2-redhat Aug 21, 2024
f697226
use constant
robertgshaw2-redhat Aug 21, 2024
fd642ab
fix bad cleanup
robertgshaw2-redhat Aug 21, 2024
762c2ed
remove useless argument
robertgshaw2-redhat Aug 21, 2024
c805ed2
up to 20k requests
robertgshaw2-redhat Aug 21, 2024
2e1652e
revert to 10k requests
robertgshaw2-redhat Aug 21, 2024
3e1ede4
revert suprious argument
robertgshaw2-redhat Aug 21, 2024
b3bf7ef
revert to 20k
robertgshaw2-redhat Aug 21, 2024
708bd34
format
robertgshaw2-redhat Aug 21, 2024
10a88ec
[BugFix] Raise all exception variations in async generator
njhill Aug 20, 2024
db8aebc
Fix possible premature generator completion; add tests
njhill Aug 21, 2024
b16c64b
format
robertgshaw2-redhat Aug 21, 2024
a9ecaa9
added test accuracy
robertgshaw2-redhat Aug 21, 2024
6f8d5e8
format
robertgshaw2-redhat Aug 21, 2024
bab177f
updated test pipeline
robertgshaw2-redhat Aug 21, 2024
7b58281
fix lm eval
robertgshaw2-redhat Aug 21, 2024
adf45d1
cleanup
robertgshaw2-redhat Aug 21, 2024
9e827b0
updated
robertgshaw2-redhat Aug 21, 2024
47dca36
Merge branch 'main' into fix-zmq-max-sockets
robertgshaw2-redhat Aug 21, 2024
f84c341
added sleep time
robertgshaw2-redhat Aug 21, 2024
0ce78f8
actually sleep
robertgshaw2-redhat Aug 21, 2024
8054348
formatting
robertgshaw2-redhat Aug 21, 2024
5ddbdab
format
robertgshaw2-redhat Aug 21, 2024
1ebbe9e
mypy
robertgshaw2-redhat Aug 21, 2024
53d639b
mypy
robertgshaw2-redhat Aug 21, 2024
a36b381
format
robertgshaw2-redhat Aug 21, 2024
415ee39
remove test load
robertgshaw2-redhat Aug 21, 2024
26440e6
stash
robertgshaw2-redhat Aug 21, 2024
2442a9d
Merge branch 'fix-zmq-max-sockets' of https://github.com/neuralmagic/…
robertgshaw2-redhat Aug 21, 2024
b72f84f
Merge branch 'fix-raise-cancelled' into fix-zmq-max-sockets
robertgshaw2-redhat Aug 21, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .buildkite/test-pipeline.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ steps:
- vllm/
commands:
- pip install -e ./plugins/vllm_add_dummy_model
- pip install git+https://github.com/EleutherAI/lm-evaluation-harness.git@a4987bba6e9e9b3f22bd3a6c1ecf0abd04fd5622#egg=lm_eval[api]
Copy link
Collaborator Author

@robertgshaw2-redhat robertgshaw2-redhat Aug 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need to install from source since local-completions api with support for concurrent requests is not yet in release of lm_eval

- pytest -v -s entrypoints/llm
- pytest -v -s entrypoints/openai

Expand Down
101 changes: 88 additions & 13 deletions tests/async_engine/test_async_llm_engine.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,19 @@
import asyncio
import os
from asyncio import CancelledError
from dataclasses import dataclass
from typing import Optional

import pytest
import pytest_asyncio
import torch

from vllm import SamplingParams
from vllm.config import ParallelConfig
from vllm.engine.async_llm_engine import AsyncEngineArgs, AsyncLLMEngine
from vllm.outputs import RequestOutput as RealRequestOutput

from ..conftest import cleanup
from ..utils import wait_for_gpu_memory_to_clear


Expand Down Expand Up @@ -118,33 +123,103 @@ async def test_new_requests_event():
os.environ.pop("VLLM_ALLOW_ENGINE_USE_RAY")


def test_asyncio_run():
def start_engine():
wait_for_gpu_memory_to_clear(
devices=list(range(torch.cuda.device_count())),
threshold_bytes=2 * 2**30,
timeout_s=60,
)

engine = AsyncLLMEngine.from_engine_args(
AsyncEngineArgs(model="facebook/opt-125m"))
return AsyncLLMEngine.from_engine_args(
AsyncEngineArgs(model="facebook/opt-125m", enforce_eager=True))


@pytest_asyncio.fixture(scope="module")
async def async_engine():
engine = await asyncio.get_event_loop().run_in_executor(executor=None,
func=start_engine)
try:
yield engine
finally:
engine.shutdown_background_loop()
del engine
await asyncio.sleep(0.1)
cleanup()


@pytest.fixture()
def should_do_global_cleanup_after_test(request) -> bool:
# So we can share the async engine fixture between these tests
return False


@pytest.mark.asyncio(scope="module")
async def test_asyncio_run(async_engine):

async def run(prompt: str):
sampling_params = SamplingParams(
temperature=0,
max_tokens=32,
)

async for output in engine.generate(prompt,
sampling_params,
request_id=prompt):
async for output in async_engine.generate(prompt,
sampling_params,
request_id=prompt):
final_output = output
return final_output

async def generate():
return await asyncio.gather(
run("test0"),
run("test1"),
)

results = asyncio.run(generate())
results = await asyncio.gather(
run("test0"),
run("test1"),
)
assert len(results) == 2


@pytest.mark.asyncio(scope="module")
async def test_cancellation(async_engine):
sampling_params = SamplingParams(
temperature=0,
min_tokens=10,
max_tokens=10,
)

i = 0
with pytest.raises(CancelledError):
async for output in async_engine.generate("test2",
sampling_params,
request_id="test2"):
assert not output.finished
i += 1
if i == 5:
await async_engine.abort("test2")

assert i == 5


@pytest.mark.asyncio(scope="module")
async def test_delayed_generator(async_engine):
sampling_params = SamplingParams(
temperature=0,
min_tokens=10,
max_tokens=10,
)

stream = async_engine.generate("test3",
sampling_params,
request_id="test3")
i = 0
final_output: Optional[RealRequestOutput] = None
async for output in stream:
final_output = output
if i == 0:
# wait for generation to complete before consuming
# the remaining messages
await asyncio.sleep(1)
if i < 9:
assert not output.finished
i += 1

assert i == 10
assert final_output is not None
assert len(final_output.outputs[0].token_ids) == 10
assert final_output.finished
55 changes: 55 additions & 0 deletions tests/entrypoints/openai/test_accuracy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
"""
This file test accuracy of the vLLM server via LMEval.
It uses local-completions, which interacts with vLLM
through the OAI API with N concurrent connections.
This simulates real work usage of the API and makes
sure that the zmq frontend mp RPC message passing and
AsyncLLMEngine are working correctly.
"""

import lm_eval
import pytest

from ...utils import RemoteOpenAIServer

MODEL_NAME = "Qwen/Qwen2-1.5B-Instruct"
NUM_CONCURRENT = 500
TASK = "gsm8k"
FILTER = "exact_match,strict-match"
RTOL = 0.03
EXPECTED_VALUE = 0.58


@pytest.fixture(scope="module")
def server():
args = [
"--max-model-len", "4096", "--enable-chunked-prefill",
"--disable-log-requests", "--enforce-eager"
]

with RemoteOpenAIServer(MODEL_NAME, args) as remote_server:
yield remote_server


@pytest.fixture(scope="module")
def server_data(server):
return {
"url": f"{server.url_for('v1')}/completions",
}


def test_lm_eval_accuracy(server_data):
model_args = (f"model={MODEL_NAME},"
f"base_url={server_data['url']},"
f"num_concurrent={NUM_CONCURRENT},tokenized_requests=False")

results = lm_eval.simple_evaluate(
model="local-completions",
model_args=model_args,
tasks=TASK,
)

measured_value = results["results"][TASK][FILTER]
assert (measured_value - RTOL < EXPECTED_VALUE
and measured_value + RTOL > EXPECTED_VALUE
), f"Expected: {EXPECTED_VALUE} | Measured: {measured_value}"
26 changes: 18 additions & 8 deletions vllm/engine/async_llm_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
import time
from dataclasses import dataclass
from functools import partial
from typing import (AsyncGenerator, Callable, Dict, Iterable, List, Mapping,
Optional, Set, Tuple, Type, Union)
from typing import (Any, AsyncGenerator, Callable, Dict, Iterable, List,
Mapping, Optional, Set, Tuple, Type, Union)

import torch
from typing_extensions import assert_never
Expand Down Expand Up @@ -85,9 +85,8 @@ def __init__(self, request_id: str, cancel: Callable[[str], None]) -> None:

def put(self, item: Union[RequestOutput, EmbeddingRequestOutput,
Exception]) -> None:
if self._finished:
return
self._queue.put_nowait(item)
if not self._finished:
self._queue.put_nowait(item)

def finish(
self,
Expand All @@ -96,7 +95,7 @@ def finish(
if not self._finished:
self._finished = True
self._queue.put_nowait(
exception if exception is not None else STOP_ITERATION)
exception if self._is_raisable(exception) else STOP_ITERATION)

@property
def finished(self) -> bool:
Expand All @@ -106,9 +105,9 @@ async def generator(
self
) -> AsyncGenerator[Union[RequestOutput, EmbeddingRequestOutput], None]:
try:
while not self._finished:
while True:
result = await self._queue.get()
if isinstance(result, Exception):
if self._is_raisable(result):
if result == STOP_ITERATION:
return
raise result
Expand All @@ -117,6 +116,12 @@ async def generator(
self._cancel(self.request_id)
raise asyncio.CancelledError from None

@staticmethod
def _is_raisable(value: Any):
return isinstance(value, BaseException) or \
(isinstance(value, type) and \
issubclass(value, BaseException))


class RequestTracker:
"""Synchronous abstraction for tracking requests."""
Expand Down Expand Up @@ -761,6 +766,11 @@ def is_stopped(self) -> bool:
def errored(self) -> bool:
return self._errored_with is not None

@property
def limit_concurrency(self) -> Optional[int]:
"""Maximum number of concurrently running requests."""
return None

def set_errored(self, exc: Exception) -> None:
self._errored_with = exc

Expand Down
4 changes: 4 additions & 0 deletions vllm/engine/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ def is_stopped(self) -> bool:
def errored(self) -> bool:
...

@property
def limit_concurrency(self) -> Optional[int]:
"""Maximum number of concurrently running requests."""

def generate(
self,
inputs: PromptInputs,
Expand Down
9 changes: 9 additions & 0 deletions vllm/entrypoints/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,15 @@ async def serve_http(app: FastAPI, engine: AsyncEngineClient,

logger.info("Route: %s, Methods: %s", path, ', '.join(methods))

# Set concurrency limits in uvicorn if running in multiprocessing mode
# since zmq has maximum socket limit of zmq.constants.SOCKET_LIMIT (65536).
if engine.limit_concurrency is not None:
logger.info(
"Launching Uvicorn with --limit_concurrency %s. To avoid this "
"limit at the expense of performance run with "
"--disable-frontend-multiprocessing", engine.limit_concurrency)
uvicorn_kwargs["limit_concurrency"] = engine.limit_concurrency

config = uvicorn.Config(app, **uvicorn_kwargs)
server = uvicorn.Server(config)
_add_shutdown_handlers(app, server, engine)
Expand Down
11 changes: 6 additions & 5 deletions vllm/entrypoints/openai/api_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,12 @@ async def build_async_engine_client(
logger.info("Multiprocessing frontend to use %s for RPC Path.",
rpc_path)

# Build RPCClient, which conforms to AsyncEngineClient Protocol.
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved first, since we bind here

  • note: this shouldn't matter, but just being safe

# NOTE: Actually, this is not true yet. We still need to support
# embedding models via RPC (see TODO above)
rpc_client = AsyncEngineRPCClient(rpc_path)
async_engine_client = rpc_client # type: ignore

# Start RPCServer in separate process (holds the AsyncLLMEngine).
context = multiprocessing.get_context("spawn")
# the current process might have CUDA context,
Expand All @@ -145,11 +151,6 @@ async def build_async_engine_client(
rpc_server_process.start()
logger.info("Started engine process with PID %d",
rpc_server_process.pid)
# Build RPCClient, which conforms to AsyncEngineClient Protocol.
# NOTE: Actually, this is not true yet. We still need to support
# embedding models via RPC (see TODO above)
rpc_client = AsyncEngineRPCClient(rpc_path)
async_engine_client = rpc_client # type: ignore

try:
while True:
Expand Down
14 changes: 12 additions & 2 deletions vllm/entrypoints/openai/rpc/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,18 @@
from vllm.prompt_adapter.request import PromptAdapterRequest
from vllm.sampling_params import SamplingParams

# Success string used for RPC instructions.
VLLM_RPC_SUCCESS_STR = "SUCCESS"
VLLM_RPC_HEALTHY_STR = "HEALTHY"

# Timeouts.
VLLM_RPC_SERVER_START_TIMEOUT_MS = 1000
VLLM_RPC_HEALTH_TIMEOUT_MS = 10000

# Minimum value of ZMQ.SOCKET_LIMIT to run mp.
VLLM_RPC_SOCKET_LIMIT_CUTOFF = 2000

# HWM is set to Infinity.
VLLM_RPC_ZMQ_HWM = 0


@dataclass
Expand All @@ -34,7 +44,7 @@ class RPCUtilityRequest(Enum):
GET_SCHEDULER_CONFIG = 5
GET_LORA_CONFIG = 6
DO_LOG_STATS = 7
CHECK_HEALTH = 8
IS_SERVER_HEALTHY = 8
IS_TRACING_ENABLED = 9


Expand Down
Loading
Loading