Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feat] litellm.acompletion() make Langfuse success handler non blocking #1519

Merged
merged 9 commits into from
Jan 19, 2024
5 changes: 3 additions & 2 deletions litellm/proxy/proxy_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,13 @@ model_list:
mode: embedding
litellm_settings:
fallbacks: [{"openai-gpt-3.5": ["azure-gpt-3.5"]}]
success_callback: ['langfuse']
# cache: True
# setting callback class
# callbacks: custom_callbacks.proxy_handler_instance # sets litellm.callbacks = [proxy_handler_instance]

general_settings:
master_key: sk-1234
# general_settings:
# master_key: sk-1234
# database_type: "dynamo_db"
# database_args: { # 👈 all args - https://github.com/BerriAI/litellm/blob/befbcbb7ac8f59835ce47415c128decf37aac328/litellm/proxy/_types.py#L190
# "billing_mode": "PAY_PER_REQUEST",
Expand Down
10 changes: 4 additions & 6 deletions litellm/proxy/tests/load_test_completion.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,10 @@ async def litellm_completion():
# Your existing code for litellm_completion goes here
try:
response = await litellm_client.chat.completions.create(
model="Azure OpenAI GPT-4 Canada-East (External)",
stream=True,
model="azure-gpt-3.5",
messages=[{"role": "user", "content": f"This is a test: {uuid.uuid4()}"}],
)
async for chunk in response:
print(chunk)
print(response)
return response

except Exception as e:
Expand All @@ -27,9 +25,9 @@ async def litellm_completion():


async def main():
for i in range(1000000):
for i in range(150):
start = time.time()
n = 1000 # Number of concurrent tasks
n = 150 # Number of concurrent tasks
tasks = [litellm_completion() for _ in range(n)]

chat_completions = await asyncio.gather(*tasks)
Expand Down
49 changes: 49 additions & 0 deletions litellm/tests/test_async_callbacks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import json
import sys
import os
import io, asyncio

import logging

logging.basicConfig(level=logging.DEBUG)
sys.path.insert(0, os.path.abspath("../.."))

from litellm import completion
import litellm

litellm.num_retries = 3
import time
import pytest


async def custom_callback(
kwargs, # kwargs to completion
completion_response, # response from completion
start_time,
end_time, # start/end time
):
# Your custom code here
print("LITELLM: in custom callback function")
print("kwargs", kwargs)
print("completion_response", completion_response)
print("start_time", start_time)
print("end_time", end_time)
time.sleep(1)

return


def test_time_to_run_10_completions():
litellm.callbacks = [custom_callback]
start = time.time()

asyncio.run(
litellm.acompletion(
model="gpt-3.5-turbo", messages=[{"role": "user", "content": "hello"}]
)
)
end = time.time()
print(f"Time to run 10 completions: {end - start}")


test_time_to_run_10_completions()
10 changes: 7 additions & 3 deletions litellm/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2357,12 +2357,16 @@ async def wrapper_async(*args, **kwargs):
print_verbose(
f"Async Wrapper: Completed Call, calling async_success_handler: {logging_obj.async_success_handler}"
)
asyncio.create_task(
logging_obj.async_success_handler(result, start_time, end_time)
)
# asyncio.to_thread(
# logging_obj.async_success_handler(result, start_time, end_time)
# )
threading.Thread(
target=logging_obj.success_handler, args=(result, start_time, end_time)
).start()
threading.Thread(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ishaan-jaff how does switching create task for threads solve the issue?

i'm also concerned about creating too many threads here, which would cause issues in high-traffic

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@krrishdholakia I update the conversation of this PR with notes

target=logging_obj.async_success_handler,
args=(result, start_time, end_time),
).start()
# RETURN RESULT
if hasattr(result, "_hidden_params"):
result._hidden_params["model_id"] = kwargs.get("model_info", {}).get(
Expand Down