Skip to content

Commit

Permalink
Merge pull request #1519 from BerriAI/litellm_proxy_make_success_hand…
Browse files Browse the repository at this point in the history
…ler_non_blocking

[Feat] litellm.acompletion() make Langfuse success handler non blocking
  • Loading branch information
ishaan-jaff authored Jan 19, 2024
2 parents b035527 + 8cf8da1 commit 6500360
Show file tree
Hide file tree
Showing 7 changed files with 66 additions and 90 deletions.
2 changes: 2 additions & 0 deletions litellm/integrations/langfuse.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,8 @@ def _log_langfuse_v2(
if supports_tags:
for key, value in metadata.items():
tags.append(f"{key}:{value}")
if "cache_hit" in kwargs:
tags.append(f"cache_hit:{kwargs['cache_hit']}")
trace_params.update({"tags": tags})

trace = self.Langfuse.trace(**trace_params)
Expand Down
13 changes: 9 additions & 4 deletions litellm/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,11 @@ def mock_completion(
model_response["choices"][0]["message"]["content"] = mock_response
model_response["created"] = int(time.time())
model_response["model"] = model

model_response.usage = Usage(
prompt_tokens=10, completion_tokens=20, total_tokens=30
)

return model_response

except:
Expand Down Expand Up @@ -534,10 +539,6 @@ def completion(
non_default_params = {
k: v for k, v in kwargs.items() if k not in default_params
} # model-specific params - pass them straight to the model/provider
if mock_response:
return mock_completion(
model, messages, stream=stream, mock_response=mock_response
)
if timeout is None:
timeout = (
kwargs.get("request_timeout", None) or 600
Expand Down Expand Up @@ -674,6 +675,10 @@ def completion(
optional_params=optional_params,
litellm_params=litellm_params,
)
if mock_response:
return mock_completion(
model, messages, stream=stream, mock_response=mock_response
)
if custom_llm_provider == "azure":
# azure configs
api_type = get_secret("AZURE_API_TYPE") or "azure"
Expand Down
5 changes: 3 additions & 2 deletions litellm/proxy/proxy_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,13 @@ model_list:
mode: embedding
litellm_settings:
fallbacks: [{"openai-gpt-3.5": ["azure-gpt-3.5"]}]
success_callback: ['langfuse']
# cache: True
# setting callback class
# callbacks: custom_callbacks.proxy_handler_instance # sets litellm.callbacks = [proxy_handler_instance]

general_settings:
master_key: sk-1234
# general_settings:
# master_key: sk-1234
# database_type: "dynamo_db"
# database_args: { # 👈 all args - https://github.com/BerriAI/litellm/blob/befbcbb7ac8f59835ce47415c128decf37aac328/litellm/proxy/_types.py#L190
# "billing_mode": "PAY_PER_REQUEST",
Expand Down
10 changes: 4 additions & 6 deletions litellm/proxy/tests/load_test_completion.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,10 @@ async def litellm_completion():
# Your existing code for litellm_completion goes here
try:
response = await litellm_client.chat.completions.create(
model="Azure OpenAI GPT-4 Canada-East (External)",
stream=True,
model="azure-gpt-3.5",
messages=[{"role": "user", "content": f"This is a test: {uuid.uuid4()}"}],
)
async for chunk in response:
print(chunk)
print(response)
return response

except Exception as e:
Expand All @@ -27,9 +25,9 @@ async def litellm_completion():


async def main():
for i in range(1000000):
for i in range(150):
start = time.time()
n = 1000 # Number of concurrent tasks
n = 150 # Number of concurrent tasks
tasks = [litellm_completion() for _ in range(n)]

chat_completions = await asyncio.gather(*tasks)
Expand Down
57 changes: 0 additions & 57 deletions litellm/tests/langfuse.log
Original file line number Diff line number Diff line change
@@ -1,57 +0,0 @@
Starting new HTTPS connection (1): api.anthropic.com:443
Starting new HTTPS connection (1): litellm-logging.onrender.com:443
https://litellm-logging.onrender.com:443 "POST /logging HTTP/1.1" 200 38
https://api.anthropic.com:443 "POST /v1/complete HTTP/1.1" 200 None
Starting new HTTPS connection (1): litellm-logging.onrender.com:443
Request options: {'method': 'post', 'url': '/chat/completions', 'files': None, 'json_data': {'messages': [{'role': 'user', 'content': 'this is a streaming test for llama2 + langfuse'}], 'model': 'gpt-3.5-turbo', 'max_tokens': 20, 'stream': True, 'temperature': 0.2}}
connect_tcp.started host='api.openai.com' port=443 local_address=None timeout=600.0 socket_options=None
connect_tcp.complete return_value=<httpcore._backends.sync.SyncStream object at 0x1090f92d0>
start_tls.started ssl_context=<ssl.SSLContext object at 0x108ddf020> server_hostname='api.openai.com' timeout=600.0
start_tls.complete return_value=<httpcore._backends.sync.SyncStream object at 0x1090f9290>
send_request_headers.started request=<Request [b'POST']>
send_request_headers.complete
send_request_body.started request=<Request [b'POST']>
send_request_body.complete
receive_response_headers.started request=<Request [b'POST']>
https://litellm-logging.onrender.com:443 "POST /logging HTTP/1.1" 200 38
receive_response_headers.complete return_value=(b'HTTP/1.1', 200, b'OK', [(b'Date', b'Sat, 23 Dec 2023 06:33:00 GMT'), (b'Content-Type', b'text/event-stream'), (b'Transfer-Encoding', b'chunked'), (b'Connection', b'keep-alive'), (b'access-control-allow-origin', b'*'), (b'Cache-Control', b'no-cache, must-revalidate'), (b'openai-model', b'gpt-3.5-turbo-0613'), (b'openai-organization', b'reliablekeystest'), (b'openai-processing-ms', b'62'), (b'openai-version', b'2020-10-01'), (b'strict-transport-security', b'max-age=15724800; includeSubDomains'), (b'x-ratelimit-limit-requests', b'9000'), (b'x-ratelimit-limit-tokens', b'1000000'), (b'x-ratelimit-limit-tokens_usage_based', b'1000000'), (b'x-ratelimit-remaining-requests', b'8998'), (b'x-ratelimit-remaining-tokens', b'999967'), (b'x-ratelimit-remaining-tokens_usage_based', b'999967'), (b'x-ratelimit-reset-requests', b'6ms'), (b'x-ratelimit-reset-tokens', b'1ms'), (b'x-ratelimit-reset-tokens_usage_based', b'1ms'), (b'x-request-id', b'dd1029a85edecb986fb662945c9f7b4f'), (b'CF-Cache-Status', b'DYNAMIC'), (b'Set-Cookie', b'__cf_bm=dnuSnc6BPNJd4lgWKpv3iE2P5zy4r5aCVekXVi7HG7U-1703313180-1-AbeMpAfvmJ6BShULb7tMaErR5ergUrt6ohiXj1e8zoo9AotZ0Jz0alUSUcp8FXyQX2VQ9P6gBUeoSR9aE98OasU=; path=/; expires=Sat, 23-Dec-23 07:03:00 GMT; domain=.api.openai.com; HttpOnly; Secure; SameSite=None'), (b'Set-Cookie', b'_cfuvid=dET0GKSNfbtSWNJuXndP8GY8M0ANzDK4Dl7mvIfhmM0-1703313180257-0-604800000; path=/; domain=.api.openai.com; HttpOnly; Secure; SameSite=None'), (b'Server', b'cloudflare'), (b'CF-RAY', b'839e920e4f47f4b0-BOM'), (b'alt-svc', b'h3=":443"; ma=86400')])
HTTP Request: POST https://api.openai.com/v1/chat/completions "200 OK"
receive_response_body.started request=<Request [b'POST']>
receive_response_body.complete
response_closed.started
response_closed.complete
Starting new HTTPS connection (1): litellm-logging.onrender.com:443
Request options: {'method': 'post', 'url': '/chat/completions', 'files': None, 'json_data': {'messages': [{'role': 'user', 'content': "What's the weather like in San Francisco, Tokyo, and Paris?"}], 'model': 'gpt-3.5-turbo-1106', 'tool_choice': 'auto', 'tools': [{'type': 'function', 'function': {'name': 'get_current_weather', 'description': 'Get the current weather in a given location', 'parameters': {'type': 'object', 'properties': {'location': {'type': 'string', 'description': 'The city and state, e.g. San Francisco, CA'}, 'unit': {'type': 'string', 'enum': ['celsius', 'fahrenheit']}}, 'required': ['location']}}}]}}
connect_tcp.started host='api.openai.com' port=443 local_address=None timeout=600.0 socket_options=None
connect_tcp.complete return_value=<httpcore._backends.sync.SyncStream object at 0x10972d410>
start_tls.started ssl_context=<ssl.SSLContext object at 0x1090c5be0> server_hostname='api.openai.com' timeout=600.0
start_tls.complete return_value=<httpcore._backends.sync.SyncStream object at 0x1097547d0>
send_request_headers.started request=<Request [b'POST']>
send_request_headers.complete
send_request_body.started request=<Request [b'POST']>
send_request_body.complete
receive_response_headers.started request=<Request [b'POST']>
https://litellm-logging.onrender.com:443 "POST /logging HTTP/1.1" 200 38
receive_response_headers.complete return_value=(b'HTTP/1.1', 200, b'OK', [(b'Date', b'Sat, 23 Dec 2023 06:33:03 GMT'), (b'Content-Type', b'application/json'), (b'Transfer-Encoding', b'chunked'), (b'Connection', b'keep-alive'), (b'access-control-allow-origin', b'*'), (b'Cache-Control', b'no-cache, must-revalidate'), (b'openai-model', b'gpt-3.5-turbo-1106'), (b'openai-organization', b'reliablekeystest'), (b'openai-processing-ms', b'2145'), (b'openai-version', b'2020-10-01'), (b'strict-transport-security', b'max-age=15724800; includeSubDomains'), (b'x-ratelimit-limit-requests', b'9000'), (b'x-ratelimit-limit-tokens', b'1000000'), (b'x-ratelimit-limit-tokens_usage_based', b'1000000'), (b'x-ratelimit-remaining-requests', b'8998'), (b'x-ratelimit-remaining-tokens', b'999968'), (b'x-ratelimit-remaining-tokens_usage_based', b'999968'), (b'x-ratelimit-reset-requests', b'6ms'), (b'x-ratelimit-reset-tokens', b'1ms'), (b'x-ratelimit-reset-tokens_usage_based', b'1ms'), (b'x-request-id', b'd0fd54d3a7696ee677f3690e9e0d6d04'), (b'CF-Cache-Status', b'DYNAMIC'), (b'Set-Cookie', b'__cf_bm=P_4fUmw4vvrbGKTlavf9VWuuzzro87gvhLE0DEGKA84-1703313183-1-ARgz+AQXAzH1uTTK8iyPE3QnT8TovAP61UvYsFD+d5DWM0lFi5U2+eSgPH+Pqt+Y1fNH1FWBUn9DmVceJKvyLcU=; path=/; expires=Sat, 23-Dec-23 07:03:03 GMT; domain=.api.openai.com; HttpOnly; Secure; SameSite=None'), (b'Set-Cookie', b'_cfuvid=g.nvBthte.6BJ7KHg5tihyGwupeGfMNMGnw72QUUBQc-1703313183034-0-604800000; path=/; domain=.api.openai.com; HttpOnly; Secure; SameSite=None'), (b'Server', b'cloudflare'), (b'CF-RAY', b'839e92128b7ff2e2-BOM'), (b'Content-Encoding', b'gzip'), (b'alt-svc', b'h3=":443"; ma=86400')])
receive_response_body.started request=<Request [b'POST']>
receive_response_body.complete
response_closed.started
response_closed.complete
HTTP Request: POST https://api.openai.com/v1/chat/completions "200 OK"
nction': {'name': 'get_current_weather', 'description': 'Get the current weather in a given location', 'parameters': {'type': 'object', 'properties': {'location': {'type': 'string', 'description': 'The city and state, e.g. San Francisco, CA'}, 'unit': {'type': 'string', 'enum': ['celsius', 'fahrenheit']}}, 'required': ['location']}}}]}}
connect_tcp.started host='api.openai.com' port=443 local_address=None timeout=600.0 socket_options=None
connect_tcp.complete return_value=<httpcore._backends.sync.SyncStream object at 0x10972d410>
start_tls.started ssl_context=<ssl.SSLContext object at 0x1090c5be0> server_hostname='api.openai.com' timeout=600.0
start_tls.complete return_value=<httpcore._backends.sync.SyncStream object at 0x1097547d0>
send_request_headers.started request=<Request [b'POST']>
send_request_headers.complete
send_request_body.started request=<Request [b'POST']>
send_request_body.complete
receive_response_headers.started request=<Request [b'POST']>
https://litellm-logging.onrender.com:443 "POST /logging HTTP/1.1" 200 38
receive_response_headers.complete return_value=(b'HTTP/1.1', 200, b'OK', [(b'Date', b'Sat, 23 Dec 2023 06:33:03 GMT'), (b'Content-Type', b'application/json'), (b'Transfer-Encoding', b'chunked'), (b'Connection', b'keep-alive'), (b'access-control-allow-origin', b'*'), (b'Cache-Control', b'no-cache, must-revalidate'), (b'openai-model', b'gpt-3.5-turbo-1106'), (b'openai-organization', b'reliablekeystest'), (b'openai-processing-ms', b'2145'), (b'openai-version', b'2020-10-01'), (b'strict-transport-security', b'max-age=15724800; includeSubDomains'), (b'x-ratelimit-limit-requests', b'9000'), (b'x-ratelimit-limit-tokens', b'1000000'), (b'x-ratelimit-limit-tokens_usage_based', b'1000000'), (b'x-ratelimit-remaining-requests', b'8998'), (b'x-ratelimit-remaining-tokens', b'999968'), (b'x-ratelimit-remaining-tokens_usage_based', b'999968'), (b'x-ratelimit-reset-requests', b'6ms'), (b'x-ratelimit-reset-tokens', b'1ms'), (b'x-ratelimit-reset-tokens_usage_based', b'1ms'), (b'x-request-id', b'd0fd54d3a7696ee677f3690e9e0d6d04'), (b'CF-Cache-Status', b'DYNAMIC'), (b'Set-Cookie', b'__cf_bm=P_4fUmw4vvrbGKTlavf9VWuuzzro87gvhLE0DEGKA84-1703313183-1-ARgz+AQXAzH1uTTK8iyPE3QnT8TovAP61UvYsFD+d5DWM0lFi5U2+eSgPH+Pqt+Y1fNH1FWBUn9DmVceJKvyLcU=; path=/; expires=Sat, 23-Dec-23 07:03:03 GMT; domain=.api.openai.com; HttpOnly; Secure; SameSite=None'), (b'Set-Cookie', b'_cfuvid=g.nvBthte.6BJ7KHg5tihyGwupeGfMNMGnw72QUUBQc-1703313183034-0-604800000; path=/; domain=.api.openai.com; HttpOnly; Secure; SameSite=None'), (b'Server', b'cloudflare'), (b'CF-RAY', b'839e92128b7ff2e2-BOM'), (b'Content-Encoding', b'gzip'), (b'alt-svc', b'h3=":443"; ma=86400')])
receive_response_body.started request=<Request [b'POST']>
receive_response_body.complete
response_closed.started
response_closed.complete
HTTP Request: POST https://api.openai.com/v1/chat/completions "200 OK"
62 changes: 47 additions & 15 deletions litellm/tests/test_alangfuse.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,34 +99,66 @@ def pre_langfuse_setup():
return


@pytest.mark.skip(reason="beta test - checking langfuse output")
def test_langfuse_logging_async():
# this tests time added to make langfuse logging calls, vs just acompletion calls
try:
pre_langfuse_setup()
litellm.set_verbose = True

# Make 5 calls with an empty success_callback
litellm.success_callback = []
start_time_empty_callback = asyncio.run(make_async_calls())
print("done with no callback test")

print("starting langfuse test")
# Make 5 calls with success_callback set to "langfuse"
litellm.success_callback = ["langfuse"]
start_time_langfuse = asyncio.run(make_async_calls())
print("done with langfuse test")

# Compare the time for both scenarios
print(f"Time taken with success_callback='langfuse': {start_time_langfuse}")
print(f"Time taken with empty success_callback: {start_time_empty_callback}")

# assert the diff is not more than 1 second - this was 5 seconds before the fix
assert abs(start_time_langfuse - start_time_empty_callback) < 1

except litellm.Timeout as e:
pass
except Exception as e:
pytest.fail(f"An exception occurred - {e}")

async def _test_langfuse():
response = await litellm.acompletion(

async def make_async_calls():
tasks = []
for _ in range(5):
task = asyncio.create_task(
litellm.acompletion(
model="azure/chatgpt-v-2",
messages=[{"role": "user", "content": "This is a test"}],
max_tokens=100,
max_tokens=5,
temperature=0.7,
timeout=5,
user="test_user",
user="langfuse_latency_test_user",
mock_response="It's simple to use and easy to get started",
)
await asyncio.sleep(1)
return response
)
tasks.append(task)

response = asyncio.run(_test_langfuse())
print(f"response: {response}")
# Measure the start time before running the tasks
start_time = asyncio.get_event_loop().time()

# # check langfuse.log to see if there was a failed response
search_logs("langfuse.log")
except litellm.Timeout as e:
pass
except Exception as e:
pytest.fail(f"An exception occurred - {e}")
# Wait for all tasks to complete
responses = await asyncio.gather(*tasks)

# Print the responses when tasks return
for idx, response in enumerate(responses):
print(f"Response from Task {idx + 1}: {response}")

# Calculate the total time taken
total_time = asyncio.get_event_loop().time() - start_time

return total_time


# def test_langfuse_logging_async_text_completion():
Expand Down
7 changes: 1 addition & 6 deletions litellm/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -1864,12 +1864,6 @@ def function_setup(
# we only support async s3 logging for acompletion/aembedding since that's used on proxy
litellm._async_success_callback.append(callback)
removed_async_items.append(index)
elif callback == "langfuse" and inspect.iscoroutinefunction(
original_function
):
# use async success callback for langfuse if this is litellm.acompletion(). Streaming logging does not work otherwise
litellm._async_success_callback.append(callback)
removed_async_items.append(index)

# Pop the async items from success_callback in reverse order to avoid index issues
for index in reversed(removed_async_items):
Expand Down Expand Up @@ -2363,6 +2357,7 @@ async def wrapper_async(*args, **kwargs):
threading.Thread(
target=logging_obj.success_handler, args=(result, start_time, end_time)
).start()

# RETURN RESULT
if hasattr(result, "_hidden_params"):
result._hidden_params["model_id"] = kwargs.get("model_info", {}).get(
Expand Down

0 comments on commit 6500360

Please sign in to comment.