Skip to content

Commit

Permalink
feat: Adding AlloyDBIndexStore
Browse files Browse the repository at this point in the history
  • Loading branch information
dishaprakash committed Nov 27, 2024
1 parent 03c427a commit 7152394
Show file tree
Hide file tree
Showing 3 changed files with 460 additions and 1 deletion.
3 changes: 2 additions & 1 deletion src/llama_index_alloydb_pg/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,6 @@
# limitations under the License.

from .engine import AlloyDBEngine, Column
from .index_store import AlloyDBIndexStore

_all = ["AlloyDBEngine", "Column"]
_all = ["AlloyDBEngine", "Column", "AlloyDBIndexStore"]
153 changes: 153 additions & 0 deletions src/llama_index_alloydb_pg/index_store.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import annotations

import json
from typing import List, Optional

from llama_index.core.data_structs.data_structs import IndexStruct
from llama_index.core.storage.index_store.types import BaseIndexStore
from llama_index.core.storage.kvstore.types import DEFAULT_BATCH_SIZE

from .async_index_store import AsyncAlloyDBIndexStore
from .engine import AlloyDBEngine


class AlloyDBIndexStore(BaseIndexStore):
"""Index Store Table stored in an AlloyDB for PostgreSQL database."""

__create_key = object()

def __init__(
self, key: object, engine: AlloyDBEngine, index_store: AsyncAlloyDBIndexStore
):
"""AlloyDBIndexStore constructor.
Args:
key (object): Key to prevent direct constructor usage.
engine (AlloyDBEngine): Database connection pool.
table_name (str): Table name that stores the index metadata.
schema_name (str): The schema name where the table is located. Defaults to "public"
batch_size (str): The default batch size for bulk inserts. Defaults to 1.
Raises:
Exception: If constructor is directly called by the user.
"""
if key != AlloyDBIndexStore.__create_key:
raise Exception(
"Only create class through 'create' or 'create_sync' methods!"
)
self._engine = engine
self.__index_store = index_store

@classmethod
async def create(
cls,
engine: AlloyDBEngine,
table_name: str,
schema_name: str = "public",
batch_size: int = DEFAULT_BATCH_SIZE,
) -> AlloyDBIndexStore:
"""Create a new AlloyDBIndexStore instance.
Args:
engine (AlloyDBEngine): AlloyDB engine to use.
table_name (str): Table name that stores the index metadata.
schema_name (str): The schema name where the table is located. Defaults to "public"
batch_size (str): The default batch size for bulk inserts. Defaults to 1.
Raises:
IndexError: If the table provided does not contain required schema.
Returns:
AlloyDBIndexStore: A newly created instance of AlloyDBIndexStore.
"""
coro = AsyncAlloyDBIndexStore.create(
engine, table_name, schema_name
)
index_store = await engine._run_as_async(coro)
return cls(cls.__create_key, engine, index_store)

@classmethod
def create_sync(
cls,
engine: AlloyDBEngine,
table_name: str,
schema_name: str = "public",
batch_size: int = DEFAULT_BATCH_SIZE,
) -> AlloyDBIndexStore:
"""Create a new AlloyDBIndexStore sync instance.
Args:
engine (AlloyDBEngine): AlloyDB engine to use.
table_name (str): Table name that stores the index metadata.
schema_name (str): The schema name where the table is located. Defaults to "public"
batch_size (str): The default batch size for bulk inserts. Defaults to 1.
Raises:
IndexError: If the table provided does not contain required schema.
Returns:
AlloyDBIndexStore: A newly created instance of AlloyDBIndexStore.
"""
coro = AsyncAlloyDBIndexStore.create(
engine, table_name, schema_name
)
index_store = engine._run_as_sync(coro)
return cls(cls.__create_key, engine, index_store)

async def aindex_structs(self) -> List[IndexStruct]:
return await self._engine._run_as_async(self.__index_store.aindex_structs())

def index_structs(self) -> List[IndexStruct]:
return self._engine._run_as_sync(self.__index_store.aindex_structs())

async def aadd_index_struct(self, index_struct: IndexStruct) -> None:
"""Add an index struct.
Args:
index_struct (IndexStruct): index struct
"""
return await self._engine._run_as_async(
self.__index_store.aadd_index_struct(index_struct)
)

def add_index_struct(self, index_struct: IndexStruct) -> None:
return self._engine._run_as_sync(
self.__index_store.aadd_index_struct(index_struct)
)

async def adelete_index_struct(self, key: str) -> None:
return await self._engine._run_as_async(
self.__index_store.adelete_index_struct(key)
)

def delete_index_struct(self, key: str) -> None:
return self._engine._run_as_sync(self.__index_store.adelete_index_struct(key))

async def aget_index_struct(
self, struct_id: Optional[str] = None
) -> Optional[IndexStruct]:
return await self._engine._run_as_async(
self.__index_store.aget_index_struct(struct_id)
)

def get_index_struct(
self, struct_id: Optional[str] = None
) -> Optional[IndexStruct]:
return self._engine._run_as_sync(
self.__index_store.aget_index_struct(struct_id)
)
Loading

0 comments on commit 7152394

Please sign in to comment.