Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feat] litellm.acompletion() make Langfuse success handler non blocking #1519

Merged
merged 9 commits into from
Jan 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions litellm/integrations/langfuse.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,8 @@ def _log_langfuse_v2(
if supports_tags:
for key, value in metadata.items():
tags.append(f"{key}:{value}")
if "cache_hit" in kwargs:
tags.append(f"cache_hit:{kwargs['cache_hit']}")
trace_params.update({"tags": tags})

trace = self.Langfuse.trace(**trace_params)
Expand Down
13 changes: 9 additions & 4 deletions litellm/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,11 @@ def mock_completion(
model_response["choices"][0]["message"]["content"] = mock_response
model_response["created"] = int(time.time())
model_response["model"] = model

model_response.usage = Usage(
prompt_tokens=10, completion_tokens=20, total_tokens=30
)

return model_response

except:
Expand Down Expand Up @@ -534,10 +539,6 @@ def completion(
non_default_params = {
k: v for k, v in kwargs.items() if k not in default_params
} # model-specific params - pass them straight to the model/provider
if mock_response:
return mock_completion(
model, messages, stream=stream, mock_response=mock_response
)
if timeout is None:
timeout = (
kwargs.get("request_timeout", None) or 600
Expand Down Expand Up @@ -674,6 +675,10 @@ def completion(
optional_params=optional_params,
litellm_params=litellm_params,
)
if mock_response:
return mock_completion(
model, messages, stream=stream, mock_response=mock_response
)
if custom_llm_provider == "azure":
# azure configs
api_type = get_secret("AZURE_API_TYPE") or "azure"
Expand Down
5 changes: 3 additions & 2 deletions litellm/proxy/proxy_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,13 @@ model_list:
mode: embedding
litellm_settings:
fallbacks: [{"openai-gpt-3.5": ["azure-gpt-3.5"]}]
success_callback: ['langfuse']
# cache: True
# setting callback class
# callbacks: custom_callbacks.proxy_handler_instance # sets litellm.callbacks = [proxy_handler_instance]

general_settings:
master_key: sk-1234
# general_settings:
# master_key: sk-1234
# database_type: "dynamo_db"
# database_args: { # 👈 all args - https://github.com/BerriAI/litellm/blob/befbcbb7ac8f59835ce47415c128decf37aac328/litellm/proxy/_types.py#L190
# "billing_mode": "PAY_PER_REQUEST",
Expand Down
10 changes: 4 additions & 6 deletions litellm/proxy/tests/load_test_completion.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,10 @@ async def litellm_completion():
# Your existing code for litellm_completion goes here
try:
response = await litellm_client.chat.completions.create(
model="Azure OpenAI GPT-4 Canada-East (External)",
stream=True,
model="azure-gpt-3.5",
messages=[{"role": "user", "content": f"This is a test: {uuid.uuid4()}"}],
)
async for chunk in response:
print(chunk)
print(response)
return response

except Exception as e:
Expand All @@ -27,9 +25,9 @@ async def litellm_completion():


async def main():
for i in range(1000000):
for i in range(150):
start = time.time()
n = 1000 # Number of concurrent tasks
n = 150 # Number of concurrent tasks
tasks = [litellm_completion() for _ in range(n)]

chat_completions = await asyncio.gather(*tasks)
Expand Down
57 changes: 0 additions & 57 deletions litellm/tests/langfuse.log
Original file line number Diff line number Diff line change
@@ -1,57 +0,0 @@
Starting new HTTPS connection (1): api.anthropic.com:443
Starting new HTTPS connection (1): litellm-logging.onrender.com:443
https://litellm-logging.onrender.com:443 "POST /logging HTTP/1.1" 200 38
https://api.anthropic.com:443 "POST /v1/complete HTTP/1.1" 200 None
Starting new HTTPS connection (1): litellm-logging.onrender.com:443
Request options: {'method': 'post', 'url': '/chat/completions', 'files': None, 'json_data': {'messages': [{'role': 'user', 'content': 'this is a streaming test for llama2 + langfuse'}], 'model': 'gpt-3.5-turbo', 'max_tokens': 20, 'stream': True, 'temperature': 0.2}}
connect_tcp.started host='api.openai.com' port=443 local_address=None timeout=600.0 socket_options=None
connect_tcp.complete return_value=<httpcore._backends.sync.SyncStream object at 0x1090f92d0>
start_tls.started ssl_context=<ssl.SSLContext object at 0x108ddf020> server_hostname='api.openai.com' timeout=600.0
start_tls.complete return_value=<httpcore._backends.sync.SyncStream object at 0x1090f9290>
send_request_headers.started request=<Request [b'POST']>
send_request_headers.complete
send_request_body.started request=<Request [b'POST']>
send_request_body.complete
receive_response_headers.started request=<Request [b'POST']>
https://litellm-logging.onrender.com:443 "POST /logging HTTP/1.1" 200 38
receive_response_headers.complete return_value=(b'HTTP/1.1', 200, b'OK', [(b'Date', b'Sat, 23 Dec 2023 06:33:00 GMT'), (b'Content-Type', b'text/event-stream'), (b'Transfer-Encoding', b'chunked'), (b'Connection', b'keep-alive'), (b'access-control-allow-origin', b'*'), (b'Cache-Control', b'no-cache, must-revalidate'), (b'openai-model', b'gpt-3.5-turbo-0613'), (b'openai-organization', b'reliablekeystest'), (b'openai-processing-ms', b'62'), (b'openai-version', b'2020-10-01'), (b'strict-transport-security', b'max-age=15724800; includeSubDomains'), (b'x-ratelimit-limit-requests', b'9000'), (b'x-ratelimit-limit-tokens', b'1000000'), (b'x-ratelimit-limit-tokens_usage_based', b'1000000'), (b'x-ratelimit-remaining-requests', b'8998'), (b'x-ratelimit-remaining-tokens', b'999967'), (b'x-ratelimit-remaining-tokens_usage_based', b'999967'), (b'x-ratelimit-reset-requests', b'6ms'), (b'x-ratelimit-reset-tokens', b'1ms'), (b'x-ratelimit-reset-tokens_usage_based', b'1ms'), (b'x-request-id', b'dd1029a85edecb986fb662945c9f7b4f'), (b'CF-Cache-Status', b'DYNAMIC'), (b'Set-Cookie', b'__cf_bm=dnuSnc6BPNJd4lgWKpv3iE2P5zy4r5aCVekXVi7HG7U-1703313180-1-AbeMpAfvmJ6BShULb7tMaErR5ergUrt6ohiXj1e8zoo9AotZ0Jz0alUSUcp8FXyQX2VQ9P6gBUeoSR9aE98OasU=; path=/; expires=Sat, 23-Dec-23 07:03:00 GMT; domain=.api.openai.com; HttpOnly; Secure; SameSite=None'), (b'Set-Cookie', b'_cfuvid=dET0GKSNfbtSWNJuXndP8GY8M0ANzDK4Dl7mvIfhmM0-1703313180257-0-604800000; path=/; domain=.api.openai.com; HttpOnly; Secure; SameSite=None'), (b'Server', b'cloudflare'), (b'CF-RAY', b'839e920e4f47f4b0-BOM'), (b'alt-svc', b'h3=":443"; ma=86400')])
HTTP Request: POST https://api.openai.com/v1/chat/completions "200 OK"
receive_response_body.started request=<Request [b'POST']>
receive_response_body.complete
response_closed.started
response_closed.complete
Starting new HTTPS connection (1): litellm-logging.onrender.com:443
Request options: {'method': 'post', 'url': '/chat/completions', 'files': None, 'json_data': {'messages': [{'role': 'user', 'content': "What's the weather like in San Francisco, Tokyo, and Paris?"}], 'model': 'gpt-3.5-turbo-1106', 'tool_choice': 'auto', 'tools': [{'type': 'function', 'function': {'name': 'get_current_weather', 'description': 'Get the current weather in a given location', 'parameters': {'type': 'object', 'properties': {'location': {'type': 'string', 'description': 'The city and state, e.g. San Francisco, CA'}, 'unit': {'type': 'string', 'enum': ['celsius', 'fahrenheit']}}, 'required': ['location']}}}]}}
connect_tcp.started host='api.openai.com' port=443 local_address=None timeout=600.0 socket_options=None
connect_tcp.complete return_value=<httpcore._backends.sync.SyncStream object at 0x10972d410>
start_tls.started ssl_context=<ssl.SSLContext object at 0x1090c5be0> server_hostname='api.openai.com' timeout=600.0
start_tls.complete return_value=<httpcore._backends.sync.SyncStream object at 0x1097547d0>
send_request_headers.started request=<Request [b'POST']>
send_request_headers.complete
send_request_body.started request=<Request [b'POST']>
send_request_body.complete
receive_response_headers.started request=<Request [b'POST']>
https://litellm-logging.onrender.com:443 "POST /logging HTTP/1.1" 200 38
receive_response_headers.complete return_value=(b'HTTP/1.1', 200, b'OK', [(b'Date', b'Sat, 23 Dec 2023 06:33:03 GMT'), (b'Content-Type', b'application/json'), (b'Transfer-Encoding', b'chunked'), (b'Connection', b'keep-alive'), (b'access-control-allow-origin', b'*'), (b'Cache-Control', b'no-cache, must-revalidate'), (b'openai-model', b'gpt-3.5-turbo-1106'), (b'openai-organization', b'reliablekeystest'), (b'openai-processing-ms', b'2145'), (b'openai-version', b'2020-10-01'), (b'strict-transport-security', b'max-age=15724800; includeSubDomains'), (b'x-ratelimit-limit-requests', b'9000'), (b'x-ratelimit-limit-tokens', b'1000000'), (b'x-ratelimit-limit-tokens_usage_based', b'1000000'), (b'x-ratelimit-remaining-requests', b'8998'), (b'x-ratelimit-remaining-tokens', b'999968'), (b'x-ratelimit-remaining-tokens_usage_based', b'999968'), (b'x-ratelimit-reset-requests', b'6ms'), (b'x-ratelimit-reset-tokens', b'1ms'), (b'x-ratelimit-reset-tokens_usage_based', b'1ms'), (b'x-request-id', b'd0fd54d3a7696ee677f3690e9e0d6d04'), (b'CF-Cache-Status', b'DYNAMIC'), (b'Set-Cookie', b'__cf_bm=P_4fUmw4vvrbGKTlavf9VWuuzzro87gvhLE0DEGKA84-1703313183-1-ARgz+AQXAzH1uTTK8iyPE3QnT8TovAP61UvYsFD+d5DWM0lFi5U2+eSgPH+Pqt+Y1fNH1FWBUn9DmVceJKvyLcU=; path=/; expires=Sat, 23-Dec-23 07:03:03 GMT; domain=.api.openai.com; HttpOnly; Secure; SameSite=None'), (b'Set-Cookie', b'_cfuvid=g.nvBthte.6BJ7KHg5tihyGwupeGfMNMGnw72QUUBQc-1703313183034-0-604800000; path=/; domain=.api.openai.com; HttpOnly; Secure; SameSite=None'), (b'Server', b'cloudflare'), (b'CF-RAY', b'839e92128b7ff2e2-BOM'), (b'Content-Encoding', b'gzip'), (b'alt-svc', b'h3=":443"; ma=86400')])
receive_response_body.started request=<Request [b'POST']>
receive_response_body.complete
response_closed.started
response_closed.complete
HTTP Request: POST https://api.openai.com/v1/chat/completions "200 OK"
nction': {'name': 'get_current_weather', 'description': 'Get the current weather in a given location', 'parameters': {'type': 'object', 'properties': {'location': {'type': 'string', 'description': 'The city and state, e.g. San Francisco, CA'}, 'unit': {'type': 'string', 'enum': ['celsius', 'fahrenheit']}}, 'required': ['location']}}}]}}
connect_tcp.started host='api.openai.com' port=443 local_address=None timeout=600.0 socket_options=None
connect_tcp.complete return_value=<httpcore._backends.sync.SyncStream object at 0x10972d410>
start_tls.started ssl_context=<ssl.SSLContext object at 0x1090c5be0> server_hostname='api.openai.com' timeout=600.0
start_tls.complete return_value=<httpcore._backends.sync.SyncStream object at 0x1097547d0>
send_request_headers.started request=<Request [b'POST']>
send_request_headers.complete
send_request_body.started request=<Request [b'POST']>
send_request_body.complete
receive_response_headers.started request=<Request [b'POST']>
https://litellm-logging.onrender.com:443 "POST /logging HTTP/1.1" 200 38
receive_response_headers.complete return_value=(b'HTTP/1.1', 200, b'OK', [(b'Date', b'Sat, 23 Dec 2023 06:33:03 GMT'), (b'Content-Type', b'application/json'), (b'Transfer-Encoding', b'chunked'), (b'Connection', b'keep-alive'), (b'access-control-allow-origin', b'*'), (b'Cache-Control', b'no-cache, must-revalidate'), (b'openai-model', b'gpt-3.5-turbo-1106'), (b'openai-organization', b'reliablekeystest'), (b'openai-processing-ms', b'2145'), (b'openai-version', b'2020-10-01'), (b'strict-transport-security', b'max-age=15724800; includeSubDomains'), (b'x-ratelimit-limit-requests', b'9000'), (b'x-ratelimit-limit-tokens', b'1000000'), (b'x-ratelimit-limit-tokens_usage_based', b'1000000'), (b'x-ratelimit-remaining-requests', b'8998'), (b'x-ratelimit-remaining-tokens', b'999968'), (b'x-ratelimit-remaining-tokens_usage_based', b'999968'), (b'x-ratelimit-reset-requests', b'6ms'), (b'x-ratelimit-reset-tokens', b'1ms'), (b'x-ratelimit-reset-tokens_usage_based', b'1ms'), (b'x-request-id', b'd0fd54d3a7696ee677f3690e9e0d6d04'), (b'CF-Cache-Status', b'DYNAMIC'), (b'Set-Cookie', b'__cf_bm=P_4fUmw4vvrbGKTlavf9VWuuzzro87gvhLE0DEGKA84-1703313183-1-ARgz+AQXAzH1uTTK8iyPE3QnT8TovAP61UvYsFD+d5DWM0lFi5U2+eSgPH+Pqt+Y1fNH1FWBUn9DmVceJKvyLcU=; path=/; expires=Sat, 23-Dec-23 07:03:03 GMT; domain=.api.openai.com; HttpOnly; Secure; SameSite=None'), (b'Set-Cookie', b'_cfuvid=g.nvBthte.6BJ7KHg5tihyGwupeGfMNMGnw72QUUBQc-1703313183034-0-604800000; path=/; domain=.api.openai.com; HttpOnly; Secure; SameSite=None'), (b'Server', b'cloudflare'), (b'CF-RAY', b'839e92128b7ff2e2-BOM'), (b'Content-Encoding', b'gzip'), (b'alt-svc', b'h3=":443"; ma=86400')])
receive_response_body.started request=<Request [b'POST']>
receive_response_body.complete
response_closed.started
response_closed.complete
HTTP Request: POST https://api.openai.com/v1/chat/completions "200 OK"
62 changes: 47 additions & 15 deletions litellm/tests/test_alangfuse.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,34 +99,66 @@ def pre_langfuse_setup():
return


@pytest.mark.skip(reason="beta test - checking langfuse output")
def test_langfuse_logging_async():
# this tests time added to make langfuse logging calls, vs just acompletion calls
try:
pre_langfuse_setup()
litellm.set_verbose = True

# Make 5 calls with an empty success_callback
litellm.success_callback = []
start_time_empty_callback = asyncio.run(make_async_calls())
print("done with no callback test")

print("starting langfuse test")
# Make 5 calls with success_callback set to "langfuse"
litellm.success_callback = ["langfuse"]
start_time_langfuse = asyncio.run(make_async_calls())
print("done with langfuse test")

# Compare the time for both scenarios
print(f"Time taken with success_callback='langfuse': {start_time_langfuse}")
print(f"Time taken with empty success_callback: {start_time_empty_callback}")

# assert the diff is not more than 1 second - this was 5 seconds before the fix
assert abs(start_time_langfuse - start_time_empty_callback) < 1

except litellm.Timeout as e:
pass
except Exception as e:
pytest.fail(f"An exception occurred - {e}")

async def _test_langfuse():
response = await litellm.acompletion(

async def make_async_calls():
tasks = []
for _ in range(5):
task = asyncio.create_task(
litellm.acompletion(
model="azure/chatgpt-v-2",
messages=[{"role": "user", "content": "This is a test"}],
max_tokens=100,
max_tokens=5,
temperature=0.7,
timeout=5,
user="test_user",
user="langfuse_latency_test_user",
mock_response="It's simple to use and easy to get started",
)
await asyncio.sleep(1)
return response
)
tasks.append(task)

response = asyncio.run(_test_langfuse())
print(f"response: {response}")
# Measure the start time before running the tasks
start_time = asyncio.get_event_loop().time()

# # check langfuse.log to see if there was a failed response
search_logs("langfuse.log")
except litellm.Timeout as e:
pass
except Exception as e:
pytest.fail(f"An exception occurred - {e}")
# Wait for all tasks to complete
responses = await asyncio.gather(*tasks)

# Print the responses when tasks return
for idx, response in enumerate(responses):
print(f"Response from Task {idx + 1}: {response}")

# Calculate the total time taken
total_time = asyncio.get_event_loop().time() - start_time

return total_time


# def test_langfuse_logging_async_text_completion():
Expand Down
7 changes: 1 addition & 6 deletions litellm/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -1864,12 +1864,6 @@ def function_setup(
# we only support async s3 logging for acompletion/aembedding since that's used on proxy
litellm._async_success_callback.append(callback)
removed_async_items.append(index)
elif callback == "langfuse" and inspect.iscoroutinefunction(
original_function
):
# use async success callback for langfuse if this is litellm.acompletion(). Streaming logging does not work otherwise
litellm._async_success_callback.append(callback)
removed_async_items.append(index)

# Pop the async items from success_callback in reverse order to avoid index issues
for index in reversed(removed_async_items):
Expand Down Expand Up @@ -2363,6 +2357,7 @@ async def wrapper_async(*args, **kwargs):
threading.Thread(
target=logging_obj.success_handler, args=(result, start_time, end_time)
).start()

# RETURN RESULT
if hasattr(result, "_hidden_params"):
result._hidden_params["model_id"] = kwargs.get("model_info", {}).get(
Expand Down