Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add benchmark for completion pipeline #1414

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 18 additions & 8 deletions examples/llm/completion/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

from morpheus.config import Config
from morpheus.config import PipelineModes
from morpheus.io.deserializers import read_file_to_df
from morpheus.llm import LLMEngine
from morpheus.llm.nodes.extracter_node import ExtracterNode
from morpheus.llm.nodes.llm_generate_node import LLMGenerateNode
Expand Down Expand Up @@ -71,8 +72,13 @@ def _build_engine(llm_service: str):
return engine


def pipeline(num_threads: int, pipeline_batch_size: int, model_max_batch_size: int, repeat_count: int,
llm_service: str) -> float:
def pipeline(num_threads: int,
pipeline_batch_size: int,
model_max_batch_size: int,
repeat_count: int,
llm_service: str,
input_file: str = None,
shuffle: bool = False) -> float:

config = Config()

Expand All @@ -83,8 +89,10 @@ def pipeline(num_threads: int, pipeline_batch_size: int, model_max_batch_size: i
config.mode = PipelineModes.NLP
config.edge_buffer_size = 128

source_dfs = [
cudf.DataFrame({
if input_file is not None:
source_df = read_file_to_df(input_file, df_type='cudf')
else:
source_df = cudf.DataFrame({
"country": [
"France",
"Spain",
Expand All @@ -98,13 +106,15 @@ def pipeline(num_threads: int, pipeline_batch_size: int, model_max_batch_size: i
"United States",
]
})
]

if shuffle:
source_df = source_df.sample(frac=1, ignore_index=True)

completion_task = {"task_type": "completion", "task_dict": {"input_keys": ["country"], }}

pipe = LinearPipeline(config)

pipe.set_source(InMemorySourceStage(config, dataframes=source_dfs, repeat=repeat_count))
pipe.set_source(InMemorySourceStage(config, dataframes=[source_df], repeat=repeat_count))

pipe.add_stage(
DeserializeStage(config, message_type=ControlMessage, task_type="llm_engine", task_payload=completion_task))
Expand All @@ -113,10 +123,10 @@ def pipeline(num_threads: int, pipeline_batch_size: int, model_max_batch_size: i

pipe.add_stage(LLMEngineStage(config, engine=_build_engine(llm_service=llm_service)))

sink = pipe.add_stage(InMemorySinkStage(config))

pipe.add_stage(MonitorStage(config, description="Inference rate", unit="req", delayed_start=True))

sink = pipe.add_stage(InMemorySinkStage(config))

start_time = time.time()

pipe.run()
Expand Down
12 changes: 12 additions & 0 deletions examples/llm/completion/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,18 @@ def run():
default="NemoLLM",
type=click.Choice(['NemoLLM', 'OpenAI'], case_sensitive=False),
help="LLM service to issue requests to.")
@click.option(
"--input_file",
type=click.Path(exists=True, readable=True),
default=None,
required=False,
help="Input to read country names from, if undefined an in-memory DataFrame of ten countris will be used.")
@click.option(
"--shuffle",
is_flag=True,
default=False,
help=("Random shuffle order of country names."),
)
def pipeline(**kwargs):
from .pipeline import pipeline as _pipeline

Expand Down
2 changes: 1 addition & 1 deletion tests/benchmarks/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ Benchmarks for an individual workflow can be run using the following:
```bash
cd tests/benchmarks

pytest -s --benchmark-enable --benchmark-warmup=on --benchmark-warmup-iterations=1 --benchmark-autosave test_bench_e2e_pipelines.py::<test-workflow>
pytest -s --run_benchmark --benchmark-enable --benchmark-warmup=on --benchmark-warmup-iterations=1 --benchmark-autosave test_bench_e2e_pipelines.py::<test-workflow>
```
The `-s` option allows outputs of pipeline execution to be displayed so you can ensure there are no errors while running your benchmarks.

Expand Down
43 changes: 42 additions & 1 deletion tests/benchmarks/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,19 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import asyncio
import glob
import os
import typing
from unittest import mock

import GPUtil
import pytest
from test_bench_e2e_pipelines import E2E_TEST_CONFIGS


# pylint: disable=unused-argument
def pytest_benchmark_update_json(config, benchmarks, output_json):

gpus = GPUtil.getGPUs()

for i, gpu in enumerate(gpus):
Expand All @@ -37,6 +40,8 @@ def pytest_benchmark_update_json(config, benchmarks, output_json):
output_json["machine_info"]["gpu_" + str(i)]["uuid"] = gpu.uuid

for bench in output_json['benchmarks']:
if bench["name"] not in E2E_TEST_CONFIGS:
continue

line_count = 0
byte_count = 0
Expand Down Expand Up @@ -70,3 +75,39 @@ def pytest_benchmark_update_json(config, benchmarks, output_json):
bench['stats']['max_throughput_bytes'] = (byte_count * repeat) / bench['stats']['min']
bench['stats']['mean_throughput_bytes'] = (byte_count * repeat) / bench['stats']['mean']
bench['stats']['median_throughput_bytes'] = (byte_count * repeat) / bench['stats']['median']


@pytest.mark.usefixtures("openai")
@pytest.fixture(name="mock_chat_completion")
@pytest.mark.usefixtures()
def mock_chat_completion_fixture(mock_chat_completion: mock.MagicMock):

async def sleep_first(*args, **kwargs):
# Sleep time is based on average request time
await asyncio.sleep(1.265)
return mock.DEFAULT

mock_chat_completion.acreate.side_effect = sleep_first

yield mock_chat_completion


@pytest.mark.usefixtures("nemollm")
@pytest.fixture(name="mock_nemollm")
def mock_nemollm_fixture(mock_nemollm: mock.MagicMock):
# The generate function is a blocking call that returns a future when return_type="async"

async def sleep_first(fut: asyncio.Future, value: typing.Any = mock.DEFAULT):
# Sleep time is based on average request time
await asyncio.sleep(0.412)
fut.set_result(value)

def create_future(*args, **kwargs) -> asyncio.Future:
event_loop = asyncio.get_event_loop()
fut = event_loop.create_future()
event_loop.create_task(sleep_first(fut, mock.DEFAULT))
return fut

mock_nemollm.generate.side_effect = create_future

yield mock_nemollm
87 changes: 87 additions & 0 deletions tests/benchmarks/test_bench_completion_pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
# SPDX-FileCopyrightText: Copyright (c) 2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import collections.abc
import typing

import pytest

import cudf

from _utils.dataset_manager import DatasetManager
from morpheus.config import Config
from morpheus.llm import LLMEngine
from morpheus.llm.nodes.extracter_node import ExtracterNode
from morpheus.llm.nodes.llm_generate_node import LLMGenerateNode
from morpheus.llm.nodes.prompt_template_node import PromptTemplateNode
from morpheus.llm.services.llm_service import LLMService
from morpheus.llm.services.nemo_llm_service import NeMoLLMService
from morpheus.llm.services.openai_chat_service import OpenAIChatService
from morpheus.llm.task_handlers.simple_task_handler import SimpleTaskHandler
from morpheus.messages import ControlMessage
from morpheus.pipeline.linear_pipeline import LinearPipeline
from morpheus.stages.input.in_memory_source_stage import InMemorySourceStage
from morpheus.stages.llm.llm_engine_stage import LLMEngineStage
from morpheus.stages.output.in_memory_sink_stage import InMemorySinkStage
from morpheus.stages.preprocess.deserialize_stage import DeserializeStage


def _build_engine(llm_service_cls: LLMService, model_name: str = "test_model"):
llm_service = llm_service_cls()
llm_client = llm_service.get_client(model_name=model_name)

engine = LLMEngine()
engine.add_node("extracter", node=ExtracterNode())
engine.add_node("prompts",
inputs=["/extracter"],
node=PromptTemplateNode(template="What is the capital of {{country}}?", template_format="jinja"))
engine.add_node("completion", inputs=["/prompts"], node=LLMGenerateNode(llm_client=llm_client))
engine.add_task_handler(inputs=["/completion"], handler=SimpleTaskHandler())

return engine


def _run_pipeline(config: Config,
llm_service_cls: LLMService,
source_df: cudf.DataFrame,
model_name: str = "test_model") -> dict:
"""
Loosely patterned after `examples/llm/completion`
"""
completion_task = {"task_type": "completion", "task_dict": {"input_keys": ["country"]}}

pipe = LinearPipeline(config)

pipe.set_source(InMemorySourceStage(config, dataframes=[source_df]))

pipe.add_stage(
DeserializeStage(config, message_type=ControlMessage, task_type="llm_engine", task_payload=completion_task))

pipe.add_stage(LLMEngineStage(config, engine=_build_engine(llm_service_cls, model_name=model_name)))
pipe.add_stage(InMemorySinkStage(config))

pipe.run()


@pytest.mark.use_cudf
@pytest.mark.use_python
@pytest.mark.benchmark
@pytest.mark.usefixtures("mock_nemollm", "mock_chat_completion")
@pytest.mark.parametrize("llm_service_cls", [NeMoLLMService, OpenAIChatService])
def test_completion_pipe(benchmark: collections.abc.Callable[[collections.abc.Callable], typing.Any],
config: Config,
dataset: DatasetManager,
llm_service_cls: LLMService):
benchmark(_run_pipeline, config, llm_service_cls, source_df=dataset["countries.csv"])
51 changes: 51 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@
import types
import typing
import warnings
from unittest import mock

import pytest
import requests

from _utils import import_or_skip
from _utils.kafka import _init_pytest_kafka
from _utils.kafka import kafka_bootstrap_servers_fixture # noqa: F401 pylint:disable=unused-import
from _utils.kafka import kafka_consumer_fixture # noqa: F401 pylint:disable=unused-import
Expand Down Expand Up @@ -1007,3 +1009,52 @@ def idx_part_collection_config_fixture():
def simple_collection_config_fixture():
from _utils import load_json_file
yield load_json_file(filename="service/milvus_simple_collection_conf.json")


@pytest.fixture(name="nemollm", scope='session')
def nemollm_fixture(fail_missing: bool):
"""
Fixture to ensure nemollm is installed
"""
skip_reason = ("Tests for the NeMoLLMService require the nemollm package to be installed, to install this run:\n"
"`mamba install -n base -c conda-forge conda-merge`\n"
"`conda run -n base --live-stream conda-merge docker/conda/environments/cuda${CUDA_VER}_dev.yml "
" docker/conda/environments/cuda${CUDA_VER}_examples.yml"
" > .tmp/merged.yml && mamba env update -n morpheus --file .tmp/merged.yml`")
yield import_or_skip("nemollm", reason=skip_reason, fail_missing=fail_missing)


@pytest.fixture(name="openai", scope='session')
def openai_fixture(fail_missing: bool):
"""
Fixture to ensure openai is installed
"""
skip_reason = ("Tests for the OpenAIChatService require the openai package to be installed, to install this run:\n"
"`mamba install -n base -c conda-forge conda-merge`\n"
"`conda run -n base --live-stream conda-merge docker/conda/environments/cuda${CUDA_VER}_dev.yml "
" docker/conda/environments/cuda${CUDA_VER}_examples.yml"
" > .tmp/merged.yml && mamba env update -n morpheus --file .tmp/merged.yml`")
yield import_or_skip("openai", reason=skip_reason, fail_missing=fail_missing)


@pytest.mark.usefixtures("openai")
@pytest.fixture(name="mock_chat_completion")
def mock_chat_completion_fixture():
with mock.patch("openai.ChatCompletion") as mock_chat_completion:
mock_chat_completion.return_value = mock_chat_completion

response = {'choices': [{'message': {'content': 'test_output'}}]}
mock_chat_completion.create.return_value = response.copy()
mock_chat_completion.acreate = mock.AsyncMock(return_value=response.copy())
yield mock_chat_completion


@pytest.mark.usefixtures("nemollm")
@pytest.fixture(name="mock_nemollm")
def mock_nemollm_fixture():
with mock.patch("nemollm.NemoLLM") as mock_nemollm:
mock_nemollm.return_value = mock_nemollm
mock_nemollm.generate_multiple.return_value = ["test_output"]
mock_nemollm.post_process_generate_response.return_value = {"text": "test_output"}

yield mock_nemollm
52 changes: 0 additions & 52 deletions tests/llm/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,63 +13,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from unittest import mock

import pytest

from _utils import import_or_skip
from _utils import require_env_variable


@pytest.fixture(name="nemollm", scope='session')
def nemollm_fixture(fail_missing: bool):
"""
Fixture to ensure nemollm is installed
"""
skip_reason = ("Tests for the NeMoLLMService require the nemollm package to be installed, to install this run:\n"
"`mamba install -n base -c conda-forge conda-merge`\n"
"`conda run -n base --live-stream conda-merge docker/conda/environments/cuda${CUDA_VER}_dev.yml "
" docker/conda/environments/cuda${CUDA_VER}_examples.yml"
" > .tmp/merged.yml && mamba env update -n morpheus --file .tmp/merged.yml`")
yield import_or_skip("nemollm", reason=skip_reason, fail_missing=fail_missing)


@pytest.fixture(name="openai", scope='session')
def openai_fixture(fail_missing: bool):
"""
Fixture to ensure openai is installed
"""
skip_reason = ("Tests for the OpenAIChatService require the openai package to be installed, to install this run:\n"
"`mamba install -n base -c conda-forge conda-merge`\n"
"`conda run -n base --live-stream conda-merge docker/conda/environments/cuda${CUDA_VER}_dev.yml "
" docker/conda/environments/cuda${CUDA_VER}_examples.yml"
" > .tmp/merged.yml && mamba env update -n morpheus --file .tmp/merged.yml`")
yield import_or_skip("openai", reason=skip_reason, fail_missing=fail_missing)


@pytest.mark.usefixtures("openai")
@pytest.fixture(name="mock_chat_completion")
def mock_chat_completion_fixture():
with mock.patch("openai.ChatCompletion") as mock_chat_completion:
mock_chat_completion.return_value = mock_chat_completion

response = {'choices': [{'message': {'content': 'test_output'}}]}
mock_chat_completion.create.return_value = response.copy()
mock_chat_completion.acreate = mock.AsyncMock(return_value=response.copy())
yield mock_chat_completion


@pytest.mark.usefixtures("nemollm")
@pytest.fixture(name="mock_nemollm")
def mock_nemollm_fixture():
with mock.patch("nemollm.NemoLLM") as mock_nemollm:
mock_nemollm.return_value = mock_nemollm
mock_nemollm.generate_multiple.return_value = ["test_output"]
mock_nemollm.post_process_generate_response.return_value = {"text": "test_output"}

yield mock_nemollm


@pytest.fixture(name="countries")
def countries_fixture():
yield [
Expand Down
Loading