Skip to content

Commit

Permalink
Catch the 2.15.x releases (#1120)
Browse files Browse the repository at this point in the history
Merge the patches to 2.15.x into mainline:
- Fix an issue that Marqo can not load legacy OpenAI CLIP models (#1116).
- Improve the performance for large-limit searches (#1116).
- Fix hybrid search behaviour where any error is raised as a generic 500 error ([#1116](#1116)). Users will now receive the correct error code and error messages when one or multiple errors occur in a hybrid search with the `disjunction` retrieval method.
  • Loading branch information
wanliAlex authored Feb 18, 2025
1 parent 025271b commit ca5f03b
Show file tree
Hide file tree
Showing 19 changed files with 531 additions and 62 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ COPY vespa .
RUN mvn clean package

# Stage 2: Base image for Python setup
FROM marqoai/marqo-base:48 as base_image
FROM marqoai/marqo-base:49 as base_image

# Allow mounting volume containing data and configs for vespa
VOLUME /opt/vespa/var
Expand Down
16 changes: 16 additions & 0 deletions RELEASE.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,22 @@ Upgrade the Marqo Docker image to use Python 3.9. With Python 3.8 reaching its E
- Fix a bug preventing Marqo from warming up Languagebind models. Marqo now successfully warms up Languagebind models as expected ([1031](https://github.com/marqo-ai/marqo/pull/1031)).
- Fix a bug where Languagebind models always generate normalized embeddings for non-text content. These models now correctly produce unnormalized embeddings for video, audio, and image content ([1032](https://github.com/marqo-ai/marqo/pull/1032)).

# Release 2.13.2

## Bug fixes and minor changes

- Fix a bug where adding documents with numeric lists to an unstructured index results in a 500 error. Now, Marqo successfully processes the document batch, and returns a 400 error only for individual documents that contain numeric lists([1034](https://github.com/marqo-ai/marqo/pull/1034)).
- Fix validation of custom vector fields. Custom vector fields were silently ignored when not specified as tensor fields for an unstructured index. This will now trigger a 400 error. This helps guide users to properly define the field as a tensor field([1034](https://github.com/marqo-ai/marqo/pull/1034)).
- Improve the bootstrapping process to prevent Marqo from crashing during startup when the vector store takes longer to converge, especially with multiple indexes. This ensures a smoother startup process even if the vector store takes time to fully initialize([1036](https://github.com/marqo-ai/marqo/pull/1036)).

# Release 2.13.1

## Bug fixes and minor changes

- Fix a bug where Marqo returns a 500 error if an inaccessible private image is encountered in the query or embed endpoint. Marqo now correctly returns a 400 error with a helpful error message ([1027](https://github.com/marqo-ai/marqo/pull/1027)).
- Fix a bug preventing Marqo from warming up Languagebind models. Marqo now successfully warms up Languagebind models as expected ([1031](https://github.com/marqo-ai/marqo/pull/1031)).
- Fix a bug where Languagebind models always generate normalized embeddings for non-text content. These models now correctly produce unnormalized embeddings for video, audio, and image content ([1032](https://github.com/marqo-ai/marqo/pull/1032)).

# Release 2.13.0

## New features
Expand Down
2 changes: 1 addition & 1 deletion scripts/vespa_local/vespa_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import os

VESPA_VERSION=os.getenv('VESPA_VERSION', '8.431.32') # default version baked into marqo-base:44
VESPA_VERSION=os.getenv('VESPA_VERSION', '8.472.109') # Vespa version to use as marqo-base:49


def start(args):
Expand Down
3 changes: 2 additions & 1 deletion src/marqo/core/inference/embedding_models/open_clip_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,8 @@ def _load_model_and_image_preprocessor_from_open_clip_repo(self) -> Tuple[torch.

def _load_tokenizer_from_checkpoint(self) -> Callable:
if not self.model_properties.tokenizer:
return open_clip.get_tokenizer(self.model_properties.name)
# Replace '/'with '-' to support old clip model name style
return open_clip.get_tokenizer(self.model_properties.name.replace("/", "-"))
else:
logger.info(f"Custom HFTokenizer is provided. Loading...")
return HFTokenizer(self.model_properties.tokenizer)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ def from_vespa_document(cls, document: Dict, marqo_index: SemiStructuredMarqoInd
text_fields[field_name] = fields[field_name]

return cls(id=document[cls._VESPA_DOC_ID],
fixed_fields=SemiStructuredVespaDocumentFields(**fields),
fixed_fields=SemiStructuredVespaDocumentFields.construct(**fields),
tensor_fields=tensor_fields,
text_fields=text_fields,
raw_tensor_score=cls.extract_field(fields, common.VESPA_DOC_HYBRID_RAW_TENSOR_SCORE, None),
Expand Down
5 changes: 3 additions & 2 deletions src/marqo/tensor_search/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from fastapi import Depends, FastAPI, Request
from fastapi.encoders import jsonable_encoder
from fastapi.exceptions import RequestValidationError
from fastapi.responses import JSONResponse
from fastapi.responses import JSONResponse, ORJSONResponse
from starlette.status import HTTP_422_UNPROCESSABLE_ENTITY

from marqo import config, marqo_docs
Expand Down Expand Up @@ -321,7 +321,7 @@ def search(search_query: SearchQuery, index_name: str, device: str = Depends(api
[Search API document](https://docs.marqo.ai/latest/reference/api/search/search/) for details.
"""
with RequestMetricsStore.for_request().time(f"POST /indexes/{index_name}/search"):
return tensor_search.search(
result = tensor_search.search(
config=marqo_config, text=search_query.q,
index_name=index_name, highlights=search_query.showHighlights,
searchable_attributes=search_query.searchableAttributes,
Expand All @@ -339,6 +339,7 @@ def search(search_query: SearchQuery, index_name: str, device: str = Depends(api
text_query_prefix=search_query.textQueryPrefix,
hybrid_parameters=search_query.hybridParameters
)
return ORJSONResponse(result)


@app.post("/indexes/{index_name}/recommend")
Expand Down
49 changes: 32 additions & 17 deletions src/marqo/tensor_search/tensor_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,13 @@
"""
import copy
import json
import traceback
import typing
import uuid
import os
from collections import defaultdict
from contextlib import ExitStack
from timeit import default_timer as timer
from typing import List, Optional, Union, Iterable, Sequence, Dict, Any, Tuple
from typing import List, Optional, Union, Iterable, Sequence, Dict, Any, Tuple, Set

import numpy as np
import psutil
Expand Down Expand Up @@ -1650,7 +1649,11 @@ def search(config: Config, index_name: str, text: Optional[Union[str, dict, Cust
except Exception as e:
raise api_exceptions.BadRequestError(f"reranking failure due to {str(e)}")

search_result["query"] = text
if isinstance(text, CustomVectorQuery):
search_result["query"] = text.dict() # Make object JSON serializable
else:
search_result["query"] = text

search_result["limit"] = result_count
search_result["offset"] = offset

Expand Down Expand Up @@ -1789,16 +1792,26 @@ def gather_documents_from_response(response: QueryResult, marqo_index: MarqoInde
"""
Convert a VespaQueryResponse to a Marqo search response
"""

if (marqo_index.type in [IndexType.Unstructured, IndexType.SemiStructured] and
attributes_to_retrieve is not None):
# Unstructured index and Semi-structured index stores fixed fields (numeric, boolean, string arrays, etc.) in
# combined field. It needs to select attributes after converting vespa doc to marqo doc if
# attributes_to_retrieve is specified
metadata_fields_to_retrieve = {"_id", "_score", "_highlights"}
attributes_to_retrieve_set = set(attributes_to_retrieve).union(metadata_fields_to_retrieve)
else:
# If this set is None, we will return the marqo_doc as is.
attributes_to_retrieve_set = None

vespa_index = vespa_index_factory(marqo_index)
hits = []
for doc in response.hits:
marqo_doc = vespa_index.to_marqo_document(doc.dict(), return_highlights=highlights)
marqo_doc = vespa_index.to_marqo_document(dict(doc), return_highlights=highlights)
marqo_doc['_score'] = doc.relevance

if (marqo_index.type in [IndexType.Unstructured, IndexType.SemiStructured] and
attributes_to_retrieve is not None):
# For an unstructured index, we do the attributes_to_retrieve after search
marqo_doc = unstructured_index_attributes_to_retrieve(marqo_doc, attributes_to_retrieve)
if attributes_to_retrieve_set is not None:
marqo_doc = select_attributes(marqo_doc, attributes_to_retrieve_set)

# Delete chunk data
if constants.MARQO_DOC_TENSORS in marqo_doc:
Expand All @@ -1808,16 +1821,18 @@ def gather_documents_from_response(response: QueryResult, marqo_index: MarqoInde
return {'hits': hits}


def unstructured_index_attributes_to_retrieve(marqo_doc: Dict[str, Any], attributes_to_retrieve: List[str]) -> Dict[
str, Any]:
# attributes_to_retrieve should already be validated at the start of search
attributes_to_retrieve = list(set(attributes_to_retrieve).union({"_id", "_score", "_highlights"}))
return {k: v for k, v in marqo_doc.items() if k in attributes_to_retrieve or
# Please note that numeric map fields are flattened for unstructured or semi-structured indexes.
# Therefore, when filtering on attributes_to_retrieve, we need to also include flattened map fields
# with the specified attributes as prefixes. We keep this behaviour only for compatibility reasons.
any([k.startswith(attribute + ".") for attribute in attributes_to_retrieve])}
def select_attributes(marqo_doc: Dict[str, Any], attributes_to_retrieve_set: Set[str]) -> Dict[str, Any]:
"""
Unstructured index and Semi-structured index retrieve all fixed fields (numeric, boolean, string arrays, etc.)
from Vespa when attributes_to_retrieve is specified. After converting the Vespa doc to Marqo doc, it needs to
filter out attributes not in the attributes_to_retrieve list.
Please note that numeric map fields are flattened for unstructured or semi-structured indexes.
Therefore, when filtering on attributes_to_retrieve, we need to also include flattened map fields
with the specified attributes as prefixes. We keep this behaviour only for compatibility reasons.
"""
return {k: v for k, v in marqo_doc.items() if k in attributes_to_retrieve_set or
'.' in k and k.split('.', maxsplit=1)[0] in attributes_to_retrieve_set}

def assign_query_to_vector_job(
q: BulkSearchQueryEntity, jobs: Dict[JHash, VectorisedJobs],
Expand Down
2 changes: 1 addition & 1 deletion src/marqo/version.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__version__ = "2.15.1"
__version__ = "2.15.2"

def get_version() -> str:
return f"{__version__}"
2 changes: 1 addition & 1 deletion src/marqo/vespa/models/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from .delete_document_response import DeleteDocumentResponse
from .feed_response import FeedBatchDocumentResponse, FeedBatchResponse, FeedDocumentResponse
from .query_result import QueryResult
from .query_result import QueryResult, Error
from .update_response import UpdateDocumentResponse, UpdateDocumentsBatchResponse
from .vespa_document import VespaDocument
40 changes: 26 additions & 14 deletions src/marqo/vespa/vespa_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,14 @@

import httpcore
import httpx
import orjson

import marqo.logging
import marqo.vespa.concurrency as conc
from marqo.core.models import MarqoIndex
from marqo.vespa.exceptions import (VespaStatusError, VespaError, InvalidVespaApplicationError,
VespaTimeoutError, VespaNotConvergedError, VespaActivationConflictError)
from marqo.vespa.models import VespaDocument, QueryResult, FeedBatchDocumentResponse, FeedBatchResponse, \
from marqo.vespa.models import VespaDocument, QueryResult, Error, FeedBatchDocumentResponse, FeedBatchResponse, \
FeedDocumentResponse, UpdateDocumentsBatchResponse, UpdateDocumentResponse, FeedBatchDocumentResponse
from marqo.vespa.models.application_metrics import ApplicationMetrics
from marqo.vespa.models.delete_document_response import DeleteDocumentResponse, DeleteBatchDocumentResponse, \
Expand Down Expand Up @@ -245,7 +246,7 @@ def query(self, yql: str, hits: int = 10, ranking: str = None, model_restrict: s

self._query_raise_for_status(resp)

return QueryResult(**resp.json())
return QueryResult(**orjson.loads(resp.text))

def feed_document(self, document: VespaDocument, schema: str, timeout: int = 60) -> FeedDocumentResponse:
"""
Expand Down Expand Up @@ -1008,9 +1009,26 @@ async def _delete_document_async(self,

self._raise_for_status(resp)

@classmethod
def _is_timeout_error(cls, error: Error, resp: httpx.Response) -> bool:
"""
Check if the query error is a timeout error.
"""

if error.code == 8 and error.message == "Search request soft doomed during query setup and initialization.":
logger.warn('Detected soft doomed query')
return True
if error.code == 12 and resp.status_code == 504:
return True

return False

def _query_raise_for_status(self, resp: httpx.Response) -> None:
"""
Query API specific raise for status method.
If multiple errors:
If all errors are timeout, raise VespaTimeoutError (504).
If even one error is not timeout, raise VespaStatusError (500).
"""
# See error codes here https://github.com/vespa-engine/vespa/blob/master/container-core/src/main/java/com/yahoo/container/protect/Error.java
try:
Expand All @@ -1022,18 +1040,12 @@ def _query_raise_for_status(self, resp: httpx.Response) -> None:
result.root.errors is not None
and len(result.root.errors) > 0
):
if resp.status_code == 504 and result.root.errors[0].code == 12:
raise VespaTimeoutError(message=resp.text, cause=e) from e
elif (
result.root.errors[0].code == 8
and result.root.errors[
0].message == "Search request soft doomed during query setup and initialization."
):
# The soft doom error is a bug in certain Vespa versions. Newer versions should always return
# a code 12 for timeouts
logger.warning('Detected soft doomed query')
raise VespaTimeoutError(message=resp.text, cause=e) from e

for error in result.root.errors:
if not self._is_timeout_error(error, resp):
# Raise 500 if any error is not timeout
raise VespaStatusError(message=resp.text, cause=e) from e
# Raise 504 if all errors are timeout
raise VespaTimeoutError(message=resp.text, cause=e) from e
raise e
except VespaStatusError:
raise
Expand Down
2 changes: 1 addition & 1 deletion tests/api_tests/v1/scripts/start_vespa.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

import requests

VESPA_VERSION=os.getenv('VESPA_VERSION', '8.431.32')
VESPA_VERSION=os.getenv('VESPA_VERSION', '8.472.109')


def start_vespa() -> None:
Expand Down
13 changes: 12 additions & 1 deletion tests/integ_tests/core/inference/test_open_clip_model_load.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,4 +274,15 @@ def test_load_OpenCLIPModel_with_auth_hf(self):
mock_download_model.assert_called_once_with(
repo_location=ModelLocation(**model_properties["model_location"]),
auth=model_auth,
)
)

def test_load_legacy_openai_clip_model(self):
"""A test to ensure old OpenAI CLIP models (e.g., ViT-B/32) are loaded correctly."""
model_properties = {
"name": "ViT-B/32", # Old OpenAI CLIP model name
"type": "open_clip",
"url": "https://github.com/mlfoundations/open_clip/releases/download/v0.2-weights/vit_b_32-quickgelu-laion400m_e32-46683a32.pt",
"dimensions": 512
}
model = OPEN_CLIP(model_properties=model_properties, device="cpu")
model.load()
11 changes: 6 additions & 5 deletions tests/integ_tests/core/vespa_index/test_add_documents_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
InternalError
from marqo.core.inference.tensor_fields_container import TensorFieldsContainer
from marqo.core.models.add_docs_params import AddDocsParams, BatchVectorisationMode
from marqo.core.inference.tensor_fields_container import TensorFieldsContainer
from marqo.core.exceptions import DuplicateDocumentError, AddDocumentsError, MarqoDocumentParsingError, InternalError
from marqo.core.models.marqo_add_documents_response import MarqoAddDocumentsItem
from marqo.core.models.marqo_index import FieldType
from marqo.core.unstructured_vespa_index.unstructured_add_document_handler import \
Expand All @@ -17,6 +19,7 @@
from marqo.s2_inference import s2_inference
from marqo.s2_inference.errors import S2InferenceError
from marqo.s2_inference.types import Modality
from marqo.s2_inference.multimodal_model_load import Modality
from marqo.vespa.models import VespaDocument, FeedBatchResponse, FeedBatchDocumentResponse
from marqo.vespa.models.get_document_response import Document, GetBatchResponse, GetBatchDocumentResponse
from integ_tests.marqo_test import MarqoTestCase
Expand Down Expand Up @@ -354,9 +357,7 @@ def test_unstructured_add_documents_handler_infer_modality_logic_image_false_and
]
for url, test_case in test_cases:
with self.subTest(msg=test_case):
with (patch(
"marqo.core.unstructured_vespa_index.unstructured_add_document_handler.infer_modality") as
mock_infer_modality):
with patch("marqo.core.unstructured_vespa_index.unstructured_add_document_handler.infer_modality") as mock_infer_modality:
self.assertEqual(
FieldType.Text, unstructured_add_documents_handler.
_infer_field_type(field_name="dummy_field_name", field_content=url)
Expand Down Expand Up @@ -466,8 +467,8 @@ def test_collect_tensor_field_content_infer_modality_logic(self):

for field_name, msg, called in test_cases:
with self.subTest(f"{field_name} - {msg}"):
with (patch("marqo.core.unstructured_vespa_index.unstructured_add_document_handler.infer_modality",
return_value=Modality.TEXT) as mock_infer_modality):
with patch("marqo.core.unstructured_vespa_index.unstructured_add_document_handler.infer_modality",
return_value=Modality.TEXT) as mock_infer_modality:
_ = unstructured_add_documents_handler._handle_field(
test_doc, field_name=field_name,
field_content=test_doc[field_name]
Expand Down
Loading

0 comments on commit ca5f03b

Please sign in to comment.