Skip to content

Commit

Permalink
feat(openai): parse usage if stream_options has include_usage (#987)
Browse files Browse the repository at this point in the history
  • Loading branch information
hassiebp authored Nov 4, 2024
1 parent 9976ade commit 3dd4254
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 10 deletions.
48 changes: 42 additions & 6 deletions langfuse/openai.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,17 @@ def get_langfuse_args(self):
return {**self.args, **self.kwargs}

def get_openai_args(self):
# OpenAI returns streaming usage not by default but only if stream_options has include_usage set
if self.kwargs.get("stream") and "stream_options" not in self.kwargs:
self.kwargs["stream_options"] = {"include_usage": True}

if (
self.kwargs.get("stream")
and "stream_options" in self.kwargs
and "include_usage" not in self.kwargs["stream_options"]
):
self.kwargs["stream_options"]["include_usage"] = True

return self.kwargs


Expand Down Expand Up @@ -371,7 +382,11 @@ def _get_langfuse_data_from_kwargs(


def _create_langfuse_update(
completion, generation: StatefulGenerationClient, completion_start_time, model=None
completion,
generation: StatefulGenerationClient,
completion_start_time,
model=None,
usage=None,
):
update = {
"end_time": _get_timestamp(),
Expand All @@ -381,6 +396,9 @@ def _create_langfuse_update(
if model is not None:
update["model"] = model

if usage is not None:
update["usage"] = usage

generation.update(**update)


Expand All @@ -393,6 +411,7 @@ def _extract_streamed_openai_response(resource, chunks):
chunk = chunk.__dict__

model = model or chunk.get("model", None) or None
usage = chunk.get("usage", None)

choices = chunk.get("choices", [])

Expand Down Expand Up @@ -491,6 +510,7 @@ def get_response_for_chat():
return (
model,
get_response_for_chat() if resource.type == "chat" else completion,
usage.__dict__ if _is_openai_v1() and usage is not None else usage,
)


Expand Down Expand Up @@ -519,7 +539,11 @@ def _get_langfuse_data_from_default_response(resource: OpenAiDefinition, respons

usage = response.get("usage", None)

return model, completion, usage.__dict__ if _is_openai_v1() and usage is not None else usage
return (
model,
completion,
usage.__dict__ if _is_openai_v1() and usage is not None else usage,
)


def _is_openai_v1():
Expand Down Expand Up @@ -793,14 +817,20 @@ def __exit__(self, exc_type, exc_value, traceback):
pass

def _finalize(self):
model, completion = _extract_streamed_openai_response(self.resource, self.items)
model, completion, usage = _extract_streamed_openai_response(
self.resource, self.items
)

# Avoiding the trace-update if trace-id is provided by user.
if not self.is_nested_trace:
self.langfuse.trace(id=self.generation.trace_id, output=completion)

_create_langfuse_update(
completion, self.generation, self.completion_start_time, model=model
completion,
self.generation,
self.completion_start_time,
model=model,
usage=usage,
)


Expand Down Expand Up @@ -857,14 +887,20 @@ async def __aexit__(self, exc_type, exc_value, traceback):
pass

async def _finalize(self):
model, completion = _extract_streamed_openai_response(self.resource, self.items)
model, completion, usage = _extract_streamed_openai_response(
self.resource, self.items
)

# Avoiding the trace-update if trace-id is provided by user.
if not self.is_nested_trace:
self.langfuse.trace(id=self.generation.trace_id, output=completion)

_create_langfuse_update(
completion, self.generation, self.completion_start_time, model=model
completion,
self.generation,
self.completion_start_time,
model=model,
usage=usage,
)

async def close(self) -> None:
Expand Down
9 changes: 5 additions & 4 deletions tests/test_openai.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,8 @@ def test_openai_chat_completion_stream():

chat_content = ""
for i in completion:
chat_content += i.choices[0].delta.content or ""
print("\n", i)
chat_content += (i.choices[0].delta.content or "") if i.choices else ""

assert len(chat_content) > 0

Expand Down Expand Up @@ -180,7 +181,7 @@ def test_openai_chat_completion_stream_with_next_iteration():
while True:
try:
c = next(completion)
chat_content += c.choices[0].delta.content or ""
chat_content += (c.choices[0].delta.content or "") if c.choices else ""

except StopIteration:
break
Expand Down Expand Up @@ -562,7 +563,7 @@ def test_openai_completion_stream():
assert iter(completion)
content = ""
for i in completion:
content += i.choices[0].text
content += (i.choices[0].text or "") if i.choices else ""

openai.flush_langfuse()

Expand Down Expand Up @@ -854,7 +855,7 @@ async def test_async_chat_stream_with_anext():
try:
c = await completion.__anext__()

result += c.choices[0].delta.content or ""
result += (c.choices[0].delta.content or "") if c.choices else ""

except StopAsyncIteration:
break
Expand Down

0 comments on commit 3dd4254

Please sign in to comment.