Skip to content

Commit

Permalink
buggy. streaming not working with llama-index streaming generators
Browse files Browse the repository at this point in the history
  • Loading branch information
esteban-puerta-rs committed Dec 11, 2024
1 parent 26eebdf commit adbbcbd
Show file tree
Hide file tree
Showing 6 changed files with 132 additions and 77 deletions.
71 changes: 71 additions & 0 deletions app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
from kitchenai.contrib.kitchenai_sdk.kitchenai import KitchenAIApp
from kitchenai.contrib.kitchenai_sdk.api import QuerySchema, EmbedSchema
from llama_index.core import VectorStoreIndex, StorageContext
from llama_index.vector_stores.chroma import ChromaVectorStore
from llama_index.llms.openai import OpenAI
import os
import chromadb
from llama_index.llms.openai import OpenAI
from llama_index.core.node_parser import TokenTextSplitter
from llama_index.core.extractors import (
TitleExtractor,
QuestionsAnsweredExtractor)
from llama_index.core import Document
from kitchenai.contrib.kitchenai_sdk.storage.llama_parser import Parser

kitchen = KitchenAIApp()

chroma_client = chromadb.PersistentClient(path="chroma_db")
chroma_collection = chroma_client.get_or_create_collection("quickstart")
llm = OpenAI(model="gpt-4")
chroma_collection_second_collection = chroma_client.get_or_create_collection("second_collection")

@kitchen.storage("simple-vector")
def simple_vector(dir: str, metadata: dict = {}, *args, **kwargs):
parser = Parser(api_key=os.environ.get("LLAMA_CLOUD_API_KEY", None))
response = parser.load(dir, metadata=metadata, **kwargs)
vector_store = ChromaVectorStore(chroma_collection=chroma_collection)
storage_context = StorageContext.from_defaults(vector_store=vector_store)
VectorStoreIndex.from_documents(
response["documents"], storage_context=storage_context, show_progress=True,
transformations=[TokenTextSplitter(), TitleExtractor(),QuestionsAnsweredExtractor()]
)
return {"response": len(response["documents"])}

@kitchen.storage("simple-vector2")
def simple_vector2(dir: str, metadata: dict = {}, *args, **kwargs):
parser = Parser(api_key=os.environ.get("LLAMA_CLOUD_API_KEY", None))
response = parser.load(dir, metadata=metadata, **kwargs)
vector_store = ChromaVectorStore(chroma_collection=chroma_collection_second_collection)
storage_context = StorageContext.from_defaults(vector_store=vector_store)
VectorStoreIndex.from_documents(
response["documents"], storage_context=storage_context, show_progress=True,
transformations=[TokenTextSplitter(), TitleExtractor(),QuestionsAnsweredExtractor()]
)
return {"response": len(response["documents"])}

@kitchen.query("simple-query")
def simple_query(data: QuerySchema):
vector_store = ChromaVectorStore(chroma_collection=chroma_collection)
index = VectorStoreIndex.from_vector_store(
vector_store,
)
query_engine = index.as_query_engine(chat_mode="best", llm=llm, verbose=True)
response = query_engine.query(data.query)
return {"response": response.response}

@kitchen.stream("simple-stream")
def simple_stream(data: QuerySchema):
vector_store = ChromaVectorStore(chroma_collection=chroma_collection)
index = VectorStoreIndex.from_vector_store(
vector_store,
)
query_engine = index.as_query_engine(streaming=True, llm=llm, similarity_top_k=1, verbose=True)
response = query_engine.query(data.query)

return response.response_gen

@kitchen.query("non-ai")
def non_ai(data: QuerySchema):
msg = "no AI is used in this function"
return {"response": msg}
62 changes: 16 additions & 46 deletions kitchenai/contrib/kitchenai_sdk/kitchenai.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,42 +27,18 @@ def __init__(self, router: Router = None, namespace: str = 'default', default_db
self._default_hook = "kitchenai.contrib.kitchenai_sdk.hooks.default_hook"
self._default_db = default_db
self._query_handlers = {}
self._query_stream_handlers = {}
self._agent_handlers = {}
self._embed_tasks= {}
self._embed_delete_tasks = {}

# Decorators for different route types
def query(self, label: str, streaming=False, llama_stack_emit="", **route_kwargs):
def query(self, label: str, **route_kwargs):
"""Query is a decorator for query handlers with the ability to add middleware"""
def decorator(func, **route_kwargs):
@functools.wraps(func)
async def wrapper(*args, **kwargs):
if streaming:
#NOTE: Streaming HTTP response is only a synchronous operation. Temporary solution
# async def event_generator():
# async for event in func(*args, **kwargs):
# # Flush each chunk immediately
# yield event
def event_generator():
# Call the synchronous function and get the generator
gen = func(*args, **kwargs)

for event in gen:
# Yield each chunk formatted as Server-Sent Events
yield event


result = StreamingHttpResponse(
event_generator(),
content_type="text/event-stream",
headers={
'Cache-Control': 'no-cache',
'Transfer-Encoding': 'chunked',
'X-Accel-Buffering': 'no',
}
)
# Non-streaming behavior
elif asyncio.iscoroutinefunction(func):
if asyncio.iscoroutinefunction(func):
result = await func(*args, **kwargs)
else:
loop = asyncio.get_event_loop()
Expand Down Expand Up @@ -128,29 +104,12 @@ def wrapper(*args, **kwargs):
return wrapper
return decorator

def agent(self, label: str, streaming=False, **route_kwargs):
def agent(self, label: str, **route_kwargs):
"""Agent is a decorator for agent handlers with the ability to add middleware"""
def decorator(func, **route_kwargs):
@functools.wraps(func)
async def wrapper(*args, **kwargs):
if streaming:
#NOTE: Streaming HTTP response is only a synchronous operation
async def event_generator():
async for event in func(*args, **kwargs):
# Flush each chunk immediately
yield event

return StreamingHttpResponse(
event_generator(),
content_type="text/event-stream",
headers={
'Cache-Control': 'no-cache',
'Transfer-Encoding': 'chunked',
'X-Accel-Buffering': 'no',
}
)
# Non-streaming behavior
elif asyncio.iscoroutinefunction(func):
if asyncio.iscoroutinefunction(func):
return await func(*args, **kwargs)
else:
loop = asyncio.get_event_loop()
Expand All @@ -160,6 +119,16 @@ async def event_generator():
return wrapper

return decorator

def stream(self, label: str, **route_kwargs):
"""Stream is a decorator for stream handlers. It returns a Generator function"""
def decorator(func, **route_kwargs):

self._query_stream_handlers[f"{self._namespace}.{label}"] = func

return func

return decorator

def storage_create_hook(self, label: str):
"""Hooks are functions that are run after a storage task is successful"""
Expand Down Expand Up @@ -228,6 +197,7 @@ def to_dict(self):
return {
"namespace": self._namespace,
"query_handlers": list(self._query_handlers.keys()),
"query_stream_handlers": list(self._query_stream_handlers.keys()),
"agent_handlers": list(self._agent_handlers.keys()),
"embed_tasks": list(self._embed_tasks.keys()),
"embed_delete_tasks": list(self._embed_delete_tasks.keys()),
Expand Down
64 changes: 39 additions & 25 deletions kitchenai/core/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,32 +189,46 @@ async def query(request, label: str, data: QuerySchema):
logger.error(f"Error in query: {e}")
return HttpError(500, "query function not found")

import asyncio
import datetime
@router.get("/stream/{label}")
async def stream(request, label: str):
from llama_index.llms.openai import OpenAI
llm = OpenAI(model="gpt-4o-mini")
# def async_stream_completions():
# completions = llm.stream_complete("Paul Graham is ")
# for completion in completions:
# yield completion.delta
print("starting stream")
async def mock_stream():
while True:
await asyncio.sleep(1)
chunk = f"Hello {datetime.datetime.now()}"
yield chunk
response_server = StreamingHttpResponse(
mock_stream(),
content_type="text/event-stream",
headers={
'Cache-Control': 'no-cache',
'Transfer-Encoding': 'chunked',
'X-Accel-Buffering': 'no',
}
)
return response_server
def stream(request, label: str):
"""Stream a query"""
try:
core_app = apps.get_app_config("core")
if not core_app.kitchenai_app:
logger.error("No kitchenai app in core app config")
return HttpResponse(status=404)

stream_gen_func = core_app.kitchenai_app._query_stream_handlers.get(f"{core_app.kitchenai_app._namespace}.{label}")
if not stream_gen_func:
logger.error(f"Query function not found for {label}")
return HttpResponse(status=404)

#Signal the start of the query
#query_input_signal.send(sender="query_input", data=data)
print(f"Querying {label}")

stream_gen = stream_gen_func(QuerySchema(query="summarize this code for me? write me a 500 word summary"))
#Signal the end of the query
#query_output_signal.send(sender="query_output", result=result)


response_server = StreamingHttpResponse(
stream_gen,
content_type="text/event-stream",
headers={
'Cache-Control': 'no-cache',
'Transfer-Encoding': 'chunked',
'X-Accel-Buffering': 'no',
}
)
return response_server
except Exception as e:
logger.error(f"Error in query: {e}")
return HttpError(500, "query function not found")





class KitchenAIAppSchema(Schema):
namespace: str
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ dependencies = [
"dynamic-pip",
"jupyterlab",
"nest_asyncio",
"kitchenai_python_sdk",
"kitchenai_python_sdk>=1.3.0",
"llama-index",
"chromadb",
"llama-index-vector-stores-chroma",
Expand Down
6 changes: 3 additions & 3 deletions requirements-dev.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#
# This file is autogenerated by hatch-pip-compile with Python 3.11
#
# [constraints] requirements.txt (SHA256: bef74194f8fd80762a1ef00469312dbc8243a3854822b60f9150002d8d517687)
# [constraints] requirements.txt (SHA256: 6c26fa6d3d120d73e7bd515289cf73b8641cc0b40f583f763921b86f13ba9984)
#
# - django-browser-reload
# - django-debug-toolbar
Expand Down Expand Up @@ -53,7 +53,7 @@
# - heroicons[django]
# - honcho
# - jupyterlab
# - kitchenai-python-sdk
# - kitchenai-python-sdk>=1.3.0
# - llama-index
# - llama-index-vector-stores-chroma
# - nbconvert
Expand Down Expand Up @@ -804,7 +804,7 @@ jupyterlab-widgets==3.0.13
# via ipywidgets
keyring==25.4.1
# via hatch
kitchenai-python-sdk==1.1.0
kitchenai-python-sdk==1.3.0
# via
# -c requirements.txt
# hatch.envs.dev
Expand Down
4 changes: 2 additions & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
# - heroicons[django]
# - honcho
# - jupyterlab
# - kitchenai-python-sdk
# - kitchenai-python-sdk>=1.3.0
# - llama-index
# - llama-index-vector-stores-chroma
# - nbconvert
Expand Down Expand Up @@ -426,7 +426,7 @@ jupyterlab-pygments==0.3.0
# via nbconvert
jupyterlab-server==2.27.3
# via jupyterlab
kitchenai-python-sdk==1.1.0
kitchenai-python-sdk==1.3.0
# via hatch.envs.default
kubernetes==31.0.0
# via chromadb
Expand Down

0 comments on commit adbbcbd

Please sign in to comment.