Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

haystack integration #59

Open
davidberenstein1957 opened this issue Jan 28, 2025 · 0 comments
Open

haystack integration #59

davidberenstein1957 opened this issue Jan 28, 2025 · 0 comments

Comments

@davidberenstein1957
Copy link
Contributor

davidberenstein1957 commented Jan 28, 2025

Some integrations with tools like haystack would be cool. https://docs.haystack.deepset.ai/docs/inmemorydocumentstore

from typing import Any, Dict, Iterable, List, Optional

import numpy as np
from vicinity import Vicinity

from haystack import logging
from haystack.dataclasses import Document
from haystack.document_stores.errors import DuplicateDocumentError
from haystack.document_stores.types import DuplicatePolicy
from haystack.utils import expit
from haystack.utils.filters import document_matches_filter

logger = logging.getLogger(__name__)


class VicinityDocumentStore:
    def __init__(self, backend_type: str = "basic", **kwargs):
        self.vicinity = Vicinity.from_vectors_and_items(
            vectors=[], items=[], backend_type=backend_type, store_vectors=True, **kwargs
        )

    def save_to_disk(self, path: str):
        self.vicinity.save(path)

    def load_from_disk(self, path: str):
        self.vicinity.load(path)

    def save_to_hub(self, hub_name: str, hub_model_id: str):
        self.vicinity.save_to_hub(hub_name, hub_model_id)

    def load_from_hub(self, hub_name: str, hub_model_id: str):
        self.vicinity.load_from_hub(hub_name, hub_model_id)

    def count_documents(self) -> int:
        return len(self.vicinity.items)

    def count_documents(self, documents: List[Document]):
        self.vicinity.add_documents(documents)

    def filter_documents(self, filters: Optional[Dict[str, Any]] = None) -> List[Document]:
        """
        Returns the documents that match the filters provided.

        For a detailed specification of the filters, refer to the DocumentStore.filter_documents() protocol
        documentation.

        :param filters: The filters to apply to the document list.
        :returns: A list of Documents that match the given filters.
        """
        if filters:
            if "operator" not in filters and "conditions" not in filters:
                raise ValueError(
                    "Invalid filter syntax. See https://docs.haystack.deepset.ai/docs/metadata-filtering for details."
                )
            return [doc for doc in self.vicinity.items if document_matches_filter(filters=filters, document=doc)]
        return list(self.vicinity.items)

    def write_documents(self, documents: List[Document], policy: DuplicatePolicy = DuplicatePolicy.NONE) -> int:
        """
        Refer to the DocumentStore.write_documents() protocol documentation.

        If `policy` is set to `DuplicatePolicy.NONE` defaults to `DuplicatePolicy.FAIL`.
        """
        if (
            not isinstance(documents, Iterable)
            or isinstance(documents, str)
            or any(not isinstance(doc, Document) for doc in documents)
        ):
            raise ValueError("Please provide a list of Documents.")

        if policy == DuplicatePolicy.NONE:
            policy = DuplicatePolicy.FAIL

        written_documents = len(documents)
        for document in documents:
            document_in_index = False
            for item in self.vicinity.items:
                if item.id == document.id:
                    document_in_index = True
                    break
            if policy != DuplicatePolicy.OVERWRITE and document_in_index:
                if policy == DuplicatePolicy.FAIL:
                    raise DuplicateDocumentError(f"ID '{document.id}' already exists.")
                if policy == DuplicatePolicy.SKIP:
                    logger.warning("ID '{document_id}' already exists", document_id=document.id)
                    written_documents -= 1
                    continue

            # Since the statistics are updated in an incremental manner,
            # we need to explicitly remove the existing document to revert
            # the statistics before updating them with the new document.
            if document_in_index:
                self.vicinity.delete([document])

            self.vicinity.insert([document], embedding=np.array(document.embedding))
        return written_documents

    def delete_documents(self, document_ids: List[str]) -> None:
        """
        Deletes all documents with matching document_ids from the DocumentStore.

        :param document_ids: The object_ids to delete.
        """
        items_to_delete = []
        for doc_id in document_ids:
            for item in self.vicinity.items:
                if item.id == doc_id:
                    items_to_delete.append(item)
        self.vicinity.delete(items_to_delete)

    def bm25_retrieval(
        self, query: str, filters: Optional[Dict[str, Any]] = None, top_k: int = 10, scale_score: bool = False
    ) -> List[Document]:
        """
        Retrieves documents that are most relevant to the query using BM25 algorithm.

        :param query: The query string.
        :param filters: A dictionary with filters to narrow down the search space.
        :param top_k: The number of top documents to retrieve. Default is 10.
        :param scale_score: Whether to scale the scores of the retrieved documents. Default is False.
        :returns: A list of the top_k documents most relevant to the query.
        """
        if not query:
            raise ValueError("Query should be a non-empty string")

        content_type_filter = {
            "operator": "OR",
            "conditions": [
                {"field": "content", "operator": "!=", "value": None},
                {"field": "dataframe", "operator": "!=", "value": None},
            ],
        }
        if filters:
            if "operator" not in filters:
                raise ValueError(
                    "Invalid filter syntax. See https://docs.haystack.deepset.ai/docs/metadata-filtering for details."
                )
            filters = {"operator": "AND", "conditions": [content_type_filter, filters]}
        else:
            filters = content_type_filter

        all_documents = self.filter_documents(filters=filters)
        if len(all_documents) == 0:
            logger.info("No documents found for BM25 retrieval. Returning empty list.")
            return []

        results = sorted(self.bm25_algorithm_inst(query, all_documents), key=lambda x: x[1], reverse=True)[:top_k]

        # BM25Okapi can return meaningful negative values, so they should not be filtered out when scale_score is False.
        # It's the only algorithm supported by rank_bm25 at the time of writing (2024) that can return negative scores.
        # see https://github.com/deepset-ai/haystack/pull/6889 for more context.
        negatives_are_valid = self.bm25_algorithm == "BM25Okapi" and not scale_score

        # Create documents with the BM25 score to return them
        return_documents = []
        for doc, score in results:
            if scale_score:
                score = expit(score / BM25_SCALING_FACTOR)

            if not negatives_are_valid and score <= 0.0:
                continue

            doc_fields = doc.to_dict()
            doc_fields["score"] = score
            return_document = Document.from_dict(doc_fields)
            return_documents.append(return_document)

        return return_documents

    def embedding_retrieval(  # pylint: disable=too-many-positional-arguments
        self,
        query_embedding: List[float],
        filters: Optional[Dict[str, Any]] = None,
        top_k: int = 10,
        scale_score: bool = False,
        return_embedding: bool = False,
    ) -> List[Document]:
        """
        Retrieves documents that are most similar to the query embedding using a vector similarity metric.

        :param query_embedding: Embedding of the query.
        :param filters: A dictionary with filters to narrow down the search space.
        :param top_k: The number of top documents to retrieve. Default is 10.
        :param scale_score: Whether to scale the scores of the retrieved Documents. Default is False.
        :param return_embedding: Whether to return the embedding of the retrieved Documents. Default is False.
        :returns: A list of the top_k documents most relevant to the query.
        """
        if len(query_embedding) == 0 or not isinstance(query_embedding[0], float):
            raise ValueError("query_embedding should be a non-empty list of floats.")

        k_nearest_neighbors = self.vicinity.query(np.array(query_embedding), top_k=top_k)
        all_documents = [self.vicinity.items[i[0]] for i in k_nearest_neighbors]
        distances = [i[1] for i in k_nearest_neighbors]

        filters = filters or {}
        all_documents = [
            (doc, distance)
            for doc, distance in zip(all_documents, distances)
            if document_matches_filter(filters=filters, document=doc)
        ]

        # create Documents with the similarity score for the top k results
        top_documents = []
        for doc, score in sorted(all_documents, key=lambda x: x[1], reverse=True)[:top_k]:
            doc_fields = doc.to_dict()
            doc_fields["score"] = score
            if return_embedding is False:
                doc_fields["embedding"] = None
            top_documents.append(Document.from_dict(doc_fields))

        return top_documents
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant