diff --git a/app.py b/app.py new file mode 100644 index 0000000..0138226 --- /dev/null +++ b/app.py @@ -0,0 +1,71 @@ +from kitchenai.contrib.kitchenai_sdk.kitchenai import KitchenAIApp +from kitchenai.contrib.kitchenai_sdk.api import QuerySchema, EmbedSchema +from llama_index.core import VectorStoreIndex, StorageContext +from llama_index.vector_stores.chroma import ChromaVectorStore +from llama_index.llms.openai import OpenAI +import os +import chromadb +from llama_index.llms.openai import OpenAI +from llama_index.core.node_parser import TokenTextSplitter +from llama_index.core.extractors import ( + TitleExtractor, + QuestionsAnsweredExtractor) +from llama_index.core import Document +from kitchenai.contrib.kitchenai_sdk.storage.llama_parser import Parser + +kitchen = KitchenAIApp() + +chroma_client = chromadb.PersistentClient(path="chroma_db") +chroma_collection = chroma_client.get_or_create_collection("quickstart") +llm = OpenAI(model="gpt-4") +chroma_collection_second_collection = chroma_client.get_or_create_collection("second_collection") + +@kitchen.storage("simple-vector") +def simple_vector(dir: str, metadata: dict = {}, *args, **kwargs): + parser = Parser(api_key=os.environ.get("LLAMA_CLOUD_API_KEY", None)) + response = parser.load(dir, metadata=metadata, **kwargs) + vector_store = ChromaVectorStore(chroma_collection=chroma_collection) + storage_context = StorageContext.from_defaults(vector_store=vector_store) + VectorStoreIndex.from_documents( + response["documents"], storage_context=storage_context, show_progress=True, + transformations=[TokenTextSplitter(), TitleExtractor(),QuestionsAnsweredExtractor()] + ) + return {"response": len(response["documents"])} + +@kitchen.storage("simple-vector2") +def simple_vector2(dir: str, metadata: dict = {}, *args, **kwargs): + parser = Parser(api_key=os.environ.get("LLAMA_CLOUD_API_KEY", None)) + response = parser.load(dir, metadata=metadata, **kwargs) + vector_store = ChromaVectorStore(chroma_collection=chroma_collection_second_collection) + storage_context = StorageContext.from_defaults(vector_store=vector_store) + VectorStoreIndex.from_documents( + response["documents"], storage_context=storage_context, show_progress=True, + transformations=[TokenTextSplitter(), TitleExtractor(),QuestionsAnsweredExtractor()] + ) + return {"response": len(response["documents"])} + +@kitchen.query("simple-query") +def simple_query(data: QuerySchema): + vector_store = ChromaVectorStore(chroma_collection=chroma_collection) + index = VectorStoreIndex.from_vector_store( + vector_store, + ) + query_engine = index.as_query_engine(chat_mode="best", llm=llm, verbose=True) + response = query_engine.query(data.query) + return {"response": response.response} + +@kitchen.stream("simple-stream") +def simple_stream(data: QuerySchema): + vector_store = ChromaVectorStore(chroma_collection=chroma_collection) + index = VectorStoreIndex.from_vector_store( + vector_store, + ) + query_engine = index.as_query_engine(streaming=True, llm=llm, similarity_top_k=1, verbose=True) + response = query_engine.query(data.query) + + return response.response_gen + +@kitchen.query("non-ai") +def non_ai(data: QuerySchema): + msg = "no AI is used in this function" + return {"response": msg} \ No newline at end of file diff --git a/kitchenai/contrib/kitchenai_sdk/kitchenai.py b/kitchenai/contrib/kitchenai_sdk/kitchenai.py index 10b5770..336f2c9 100644 --- a/kitchenai/contrib/kitchenai_sdk/kitchenai.py +++ b/kitchenai/contrib/kitchenai_sdk/kitchenai.py @@ -27,42 +27,18 @@ def __init__(self, router: Router = None, namespace: str = 'default', default_db self._default_hook = "kitchenai.contrib.kitchenai_sdk.hooks.default_hook" self._default_db = default_db self._query_handlers = {} + self._query_stream_handlers = {} self._agent_handlers = {} self._embed_tasks= {} self._embed_delete_tasks = {} # Decorators for different route types - def query(self, label: str, streaming=False, llama_stack_emit="", **route_kwargs): + def query(self, label: str, **route_kwargs): """Query is a decorator for query handlers with the ability to add middleware""" def decorator(func, **route_kwargs): @functools.wraps(func) async def wrapper(*args, **kwargs): - if streaming: - #NOTE: Streaming HTTP response is only a synchronous operation. Temporary solution - # async def event_generator(): - # async for event in func(*args, **kwargs): - # # Flush each chunk immediately - # yield event - def event_generator(): - # Call the synchronous function and get the generator - gen = func(*args, **kwargs) - - for event in gen: - # Yield each chunk formatted as Server-Sent Events - yield event - - - result = StreamingHttpResponse( - event_generator(), - content_type="text/event-stream", - headers={ - 'Cache-Control': 'no-cache', - 'Transfer-Encoding': 'chunked', - 'X-Accel-Buffering': 'no', - } - ) - # Non-streaming behavior - elif asyncio.iscoroutinefunction(func): + if asyncio.iscoroutinefunction(func): result = await func(*args, **kwargs) else: loop = asyncio.get_event_loop() @@ -128,29 +104,12 @@ def wrapper(*args, **kwargs): return wrapper return decorator - def agent(self, label: str, streaming=False, **route_kwargs): + def agent(self, label: str, **route_kwargs): """Agent is a decorator for agent handlers with the ability to add middleware""" def decorator(func, **route_kwargs): @functools.wraps(func) async def wrapper(*args, **kwargs): - if streaming: - #NOTE: Streaming HTTP response is only a synchronous operation - async def event_generator(): - async for event in func(*args, **kwargs): - # Flush each chunk immediately - yield event - - return StreamingHttpResponse( - event_generator(), - content_type="text/event-stream", - headers={ - 'Cache-Control': 'no-cache', - 'Transfer-Encoding': 'chunked', - 'X-Accel-Buffering': 'no', - } - ) - # Non-streaming behavior - elif asyncio.iscoroutinefunction(func): + if asyncio.iscoroutinefunction(func): return await func(*args, **kwargs) else: loop = asyncio.get_event_loop() @@ -160,6 +119,16 @@ async def event_generator(): return wrapper return decorator + + def stream(self, label: str, **route_kwargs): + """Stream is a decorator for stream handlers. It returns a Generator function""" + def decorator(func, **route_kwargs): + + self._query_stream_handlers[f"{self._namespace}.{label}"] = func + + return func + + return decorator def storage_create_hook(self, label: str): """Hooks are functions that are run after a storage task is successful""" @@ -228,6 +197,7 @@ def to_dict(self): return { "namespace": self._namespace, "query_handlers": list(self._query_handlers.keys()), + "query_stream_handlers": list(self._query_stream_handlers.keys()), "agent_handlers": list(self._agent_handlers.keys()), "embed_tasks": list(self._embed_tasks.keys()), "embed_delete_tasks": list(self._embed_delete_tasks.keys()), diff --git a/kitchenai/core/api.py b/kitchenai/core/api.py index 608b2f1..92bbc3c 100644 --- a/kitchenai/core/api.py +++ b/kitchenai/core/api.py @@ -189,32 +189,46 @@ async def query(request, label: str, data: QuerySchema): logger.error(f"Error in query: {e}") return HttpError(500, "query function not found") -import asyncio -import datetime @router.get("/stream/{label}") -async def stream(request, label: str): - from llama_index.llms.openai import OpenAI - llm = OpenAI(model="gpt-4o-mini") - # def async_stream_completions(): - # completions = llm.stream_complete("Paul Graham is ") - # for completion in completions: - # yield completion.delta - print("starting stream") - async def mock_stream(): - while True: - await asyncio.sleep(1) - chunk = f"Hello {datetime.datetime.now()}" - yield chunk - response_server = StreamingHttpResponse( - mock_stream(), - content_type="text/event-stream", - headers={ - 'Cache-Control': 'no-cache', - 'Transfer-Encoding': 'chunked', - 'X-Accel-Buffering': 'no', - } - ) - return response_server +def stream(request, label: str): + """Stream a query""" + try: + core_app = apps.get_app_config("core") + if not core_app.kitchenai_app: + logger.error("No kitchenai app in core app config") + return HttpResponse(status=404) + + stream_gen_func = core_app.kitchenai_app._query_stream_handlers.get(f"{core_app.kitchenai_app._namespace}.{label}") + if not stream_gen_func: + logger.error(f"Query function not found for {label}") + return HttpResponse(status=404) + + #Signal the start of the query + #query_input_signal.send(sender="query_input", data=data) + print(f"Querying {label}") + + stream_gen = stream_gen_func(QuerySchema(query="summarize this code for me? write me a 500 word summary")) + #Signal the end of the query + #query_output_signal.send(sender="query_output", result=result) + + + response_server = StreamingHttpResponse( + stream_gen, + content_type="text/event-stream", + headers={ + 'Cache-Control': 'no-cache', + 'Transfer-Encoding': 'chunked', + 'X-Accel-Buffering': 'no', + } + ) + return response_server + except Exception as e: + logger.error(f"Error in query: {e}") + return HttpError(500, "query function not found") + + + + class KitchenAIAppSchema(Schema): namespace: str diff --git a/pyproject.toml b/pyproject.toml index ccb83c2..f472516 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -63,7 +63,7 @@ dependencies = [ "dynamic-pip", "jupyterlab", "nest_asyncio", - "kitchenai_python_sdk", + "kitchenai_python_sdk>=1.3.0", "llama-index", "chromadb", "llama-index-vector-stores-chroma", diff --git a/requirements-dev.txt b/requirements-dev.txt index 5b7ed80..68eba7f 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -1,7 +1,7 @@ # # This file is autogenerated by hatch-pip-compile with Python 3.11 # -# [constraints] requirements.txt (SHA256: bef74194f8fd80762a1ef00469312dbc8243a3854822b60f9150002d8d517687) +# [constraints] requirements.txt (SHA256: 6c26fa6d3d120d73e7bd515289cf73b8641cc0b40f583f763921b86f13ba9984) # # - django-browser-reload # - django-debug-toolbar @@ -53,7 +53,7 @@ # - heroicons[django] # - honcho # - jupyterlab -# - kitchenai-python-sdk +# - kitchenai-python-sdk>=1.3.0 # - llama-index # - llama-index-vector-stores-chroma # - nbconvert @@ -804,7 +804,7 @@ jupyterlab-widgets==3.0.13 # via ipywidgets keyring==25.4.1 # via hatch -kitchenai-python-sdk==1.1.0 +kitchenai-python-sdk==1.3.0 # via # -c requirements.txt # hatch.envs.dev diff --git a/requirements.txt b/requirements.txt index 3e1a9e5..5589f4d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -33,7 +33,7 @@ # - heroicons[django] # - honcho # - jupyterlab -# - kitchenai-python-sdk +# - kitchenai-python-sdk>=1.3.0 # - llama-index # - llama-index-vector-stores-chroma # - nbconvert @@ -426,7 +426,7 @@ jupyterlab-pygments==0.3.0 # via nbconvert jupyterlab-server==2.27.3 # via jupyterlab -kitchenai-python-sdk==1.1.0 +kitchenai-python-sdk==1.3.0 # via hatch.envs.default kubernetes==31.0.0 # via chromadb