Skip to content

Commit

Permalink
Partial updates for unstructured indexes (#1110)
Browse files Browse the repository at this point in the history
Co-authored-by: yihanzhao <yihan@marqo.ai>
  • Loading branch information
adityabharadwaj198 and papa99do authored Feb 27, 2025
1 parent e556293 commit cc707da
Show file tree
Hide file tree
Showing 47 changed files with 4,154 additions and 247 deletions.
1 change: 1 addition & 0 deletions src/marqo/api/configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ def default_env_vars() -> dict:
EnvVars.MARQO_INFERENCE_CACHE_TYPE: "LRU",
EnvVars.MARQO_BEST_AVAILABLE_DEVICE: "cpu", # on_start_script will determine this.
EnvVars.MARQO_MAX_TENSOR_FIELD_COUNT_UNSTRUCTURED: 100,
EnvVars.MARQO_MAX_STRING_ARRAY_FIELD_COUNT_UNSTRUCTURED: 100,
EnvVars.MARQO_MAX_LEXICAL_FIELD_COUNT_UNSTRUCTURED: 100,
EnvVars.MARQO_INDEX_DEPLOYMENT_LOCK_TIMEOUT: 5, # index operations acquire this distributed lock with a timeout
EnvVars.ZOOKEEPER_CONNECTION_TIMEOUT: 15,
Expand Down
3 changes: 2 additions & 1 deletion src/marqo/core/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
MARQO_DOC_CHUNKS = 'chunks'
MARQO_DOC_EMBEDDINGS = 'embeddings'
MARQO_DOC_ID = '_id'
MARQO_FIELD_TYPES = "field_types"

MARQO_SEARCH_METHOD_TENSOR = 'tensor'
MARQO_SEARCH_METHOD_LEXICAL = 'lexical'
Expand All @@ -29,4 +30,4 @@
QUERY_INPUT_SCORE_MODIFIERS_ADD_WEIGHTS_TENSOR = 'marqo__add_weights_tensor'
QUERY_INPUT_SCORE_MODIFIERS_MULT_WEIGHTS_GLOBAL = 'marqo__mult_weights_global'
QUERY_INPUT_SCORE_MODIFIERS_ADD_WEIGHTS_GLOBAL = 'marqo__add_weights_global'
MARQO_GLOBAL_SCORE_MODIFIERS = 'global'
MARQO_GLOBAL_SCORE_MODIFIERS = 'global'
131 changes: 98 additions & 33 deletions src/marqo/core/document/document.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,31 @@
from timeit import default_timer as timer
from typing import Dict, List, Tuple, Optional

import semver

import marqo.api.exceptions as api_exceptions
from marqo.core.constants import MARQO_DOC_ID
from marqo.core.models.add_docs_params import AddDocsParams
from marqo.core.exceptions import UnsupportedFeatureError, ParsingError, InternalError
from marqo.core.exceptions import UnsupportedFeatureError, ParsingError, InternalError, MarqoDocumentParsingError
from marqo.core.index_management.index_management import IndexManagement
from marqo.core.models.marqo_add_documents_response import MarqoAddDocumentsResponse, MarqoAddDocumentsItem
from marqo.core.models.marqo_index import IndexType, SemiStructuredMarqoIndex, StructuredMarqoIndex, \
UnstructuredMarqoIndex
from marqo.core.models.marqo_update_documents_response import MarqoUpdateDocumentsResponse, MarqoUpdateDocumentsItem
from marqo.core.semi_structured_vespa_index.common import SEMISTRUCTURED_INDEX_PARTIAL_UPDATE_SUPPORT_VERSION, \
VESPA_FIELD_ID, INT_FIELDS, FLOAT_FIELDS, VESPA_DOC_FIELD_TYPES, VESPA_DOC_CREATE_TIMESTAMP
from marqo.core.semi_structured_vespa_index.semi_structured_add_document_handler import \
SemiStructuredAddDocumentsHandler, SemiStructuredFieldCountConfig
from marqo.core.structured_vespa_index.structured_add_document_handler import StructuredAddDocumentsHandler
from marqo.core.unstructured_vespa_index.unstructured_add_document_handler import UnstructuredAddDocumentsHandler
from marqo.core.vespa_index.vespa_index import for_marqo_index as vespa_index_factory
from marqo.logging import get_logger
from marqo.tensor_search.telemetry import RequestMetricsStore
from marqo.vespa.models import UpdateDocumentsBatchResponse, VespaDocument
from marqo.vespa.models.delete_document_response import DeleteAllDocumentsResponse
from marqo.vespa.models.feed_response import FeedBatchResponse
from marqo.vespa.vespa_client import VespaClient
from marqo.version import get_version

logger = get_logger(__name__)

Expand Down Expand Up @@ -95,7 +101,7 @@ def partial_update_documents(self, partial_documents: List[Dict], marqo_index) \
If the document does not exist, this document will error out and the error will be returned in the response.
Args:
partial_documents: A list of documents to partially update
partial_documents: A list of documents to partially update received in the request
marqo_index: The index object to partially update documents in
Raises:
Expand All @@ -104,11 +110,15 @@ def partial_update_documents(self, partial_documents: List[Dict], marqo_index) \
Return:
MarqoUpdateDocumentsResponse containing the response of the partial update operation
"""
if marqo_index.type in [IndexType.Unstructured, IndexType.SemiStructured]:
if marqo_index.type is IndexType.Unstructured:
raise UnsupportedFeatureError("Partial document update is not supported for unstructured indexes. "
"Please use add_documents with use_existing_tensor=True instead")
elif marqo_index.type == IndexType.Structured:
elif marqo_index.type is IndexType.Structured:
pass
elif marqo_index.type is IndexType.SemiStructured:
if marqo_index.parsed_marqo_version() < SEMISTRUCTURED_INDEX_PARTIAL_UPDATE_SUPPORT_VERSION: # Partial updates for semi-structured indexes are only supported for Marqo version >= 2.16.0
raise UnsupportedFeatureError("Partial document update is not supported for this index version. "
"Please upgrade the index version, or create a new index to use this feature.")
else:
raise ValueError(f"Invalid index type: {marqo_index.type}")

Expand All @@ -118,24 +128,37 @@ def partial_update_documents(self, partial_documents: List[Dict], marqo_index) \
unsuccessful_docs: List[Tuple[int, MarqoUpdateDocumentsItem]] = []

# Remove duplicated documents based on _id
partial_documents, _ = self.remove_duplicated_documents(partial_documents)
partial_documents, doc_ids, documents_that_contain_maps = self.process_documents(partial_documents,
unsuccessful_docs, is_index_semi_structured=marqo_index.type is IndexType.SemiStructured)
existing_vespa_documents = {}

if marqo_index.type is IndexType.SemiStructured and documents_that_contain_maps: # Only retrieve the document back if the partial update request contains maps and the index is semi-structured
get_batch_response = self.vespa_client.get_batch(ids = list(documents_that_contain_maps), fields = [
VESPA_FIELD_ID, INT_FIELDS, FLOAT_FIELDS, VESPA_DOC_FIELD_TYPES, VESPA_DOC_CREATE_TIMESTAMP], schema = marqo_index.schema_name)
responses = get_batch_response.responses
for resp in responses:
existing_vespa_documents[resp.document.fields[VESPA_FIELD_ID]] = resp.document.dict()

for index, doc in enumerate(partial_documents):
try:
vespa_document = VespaDocument(**vespa_index.to_vespa_partial_document(doc))
vespa_document = VespaDocument(**vespa_index.to_vespa_partial_document(doc, existing_vespa_documents.get(doc.get(MARQO_DOC_ID, ''), None)))
vespa_documents.append(vespa_document)
except ParsingError as e:
unsuccessful_docs.append(
(index, MarqoUpdateDocumentsItem(id=doc.get(MARQO_DOC_ID, ''), error=e.message,
status=int(api_exceptions.InvalidArgError.status_code))))

vespa_res: UpdateDocumentsBatchResponse = (
self.vespa_client.update_documents_batch(vespa_documents,
marqo_index.schema_name,
vespa_id_field=vespa_index.get_vespa_id_field()))
with RequestMetricsStore.for_request().time("partial_update.vespa._bulk"):
vespa_res: UpdateDocumentsBatchResponse = (
self.vespa_client.update_documents_batch(vespa_documents,
marqo_index.schema_name,
vespa_id_field=vespa_index.get_vespa_id_field()))

with RequestMetricsStore.for_request().time("partial_update.postprocess"):
result = self._translate_update_document_response(vespa_res, unsuccessful_docs,
marqo_index.name, start_time)

return self._translate_update_document_response(vespa_res, unsuccessful_docs,
marqo_index.name, start_time)
return result

def _translate_update_document_response(self, responses: UpdateDocumentsBatchResponse, unsuccessful_docs: List,
index_name: str, start_time) \
Expand Down Expand Up @@ -170,34 +193,76 @@ def _translate_update_document_response(self, responses: UpdateDocumentsBatchRes
return MarqoUpdateDocumentsResponse(errors=errors, index_name=index_name, items=items,
processingTimeMs=(timer() - start_time) * 1000)

def remove_duplicated_documents(self, documents: List) -> Tuple[List, set]:
"""Remove duplicated documents based on _id in the given list of documents.
def process_documents(self, documents: List[Dict], unsuccessful_docs: List[Tuple[int, MarqoUpdateDocumentsItem]],
is_index_semi_structured = False) -> Tuple[List, set, set]:
"""Process documents to remove duplicates and identify documents containing maps.
This method combines duplicate removal and map detection into a single pass through
the documents for better efficiency.
For a list of documents, if there exists duplicate _id, the last document will be used while the
previous ones will be removed from the list.
This function does not validate the documents, it only removes the duplicates based on _id fields.
Args:
is_index_semi_structured: Variable denoting if the index that's is currently being processed is of type SemiStructured
unsuccessful_docs: A list of documents which were processed unsuccessfully
documents: List of document dictionaries to process
Returns:
Tuple containing:
- List of deduplicated documents
- Set of unique document IDs
- Set of document IDs that contain dictionary values
"""
# Deduplicate docs, keep the latest
docs = []
doc_ids = set()
documents_with_maps = set()

# Process documents in reverse to keep latest version of duplicates
for i in range(len(documents) - 1, -1, -1):
doc = documents[i]

if isinstance(doc, dict) and '_id' in doc:
doc_id = doc['_id']
try:
if doc_id is not None and doc_id in doc_ids:
logger.debug(f'Duplicate document ID {doc_id} found, keeping the latest')
continue
doc_ids.add(doc_id)
except TypeError as e: # Happens if ID is a non-hashable type -- ID validation will catch this later on
logger.debug(f'Could not hash document ID {doc_id}: {e}')

docs.append(doc)
# Reverse to preserve order in request

if not isinstance(doc, dict) or '_id' not in doc:
docs.append(doc)
continue

doc_id = doc['_id']

try:
# Skip if we've already seen this ID
if doc_id is not None and doc_id in doc_ids:
logger.debug(f'Duplicate document ID {doc_id} found, keeping the latest')
continue

# Check for dictionary values while processing doc to populate the documents_with_maps set.
# Only do it in case of semi-structured indexes.
if is_index_semi_structured:
for field_name, field_value in doc.items():
if isinstance(field_value, dict):
if len(field_value) == 0: # If the dictionary is empty, get back the document so that we can update the doc with an empty dictionary (i.e remove the map from the doc).
documents_with_maps.add(doc_id)
else:
for key, val in field_value.items():
if isinstance(val, (int, float)):
documents_with_maps.add(doc_id)
break
else:
raise MarqoDocumentParsingError(
f'Unsupported field type {type(val)} for field {field_name} in doc {doc_id}. We only support int and float types for map values when updating a document.'
)
break
doc_ids.add(doc_id)
docs.append(doc)

except TypeError as e:
logger.debug(f'Could not hash document ID {doc_id}: {e}')
docs.append(doc)

except MarqoDocumentParsingError as e:
unsuccessful_docs.append((i, MarqoUpdateDocumentsItem(id=doc.get(MARQO_DOC_ID, ''),
error=e.message,
status=int(api_exceptions.InvalidArgError.status_code))))

# Reverse to preserve original order
docs.reverse()
return docs, doc_ids
return docs, doc_ids, documents_with_maps

def translate_add_documents_response(self, responses: Optional[FeedBatchResponse],
index_name: str,
Expand Down
3 changes: 2 additions & 1 deletion src/marqo/core/index_management/index_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,8 @@ def is_subset(dict_a, dict_b):
return all(k in dict_b and dict_b[k] == v for k, v in dict_a.items())

if (is_subset(marqo_index.tensor_field_map, existing_index.tensor_field_map) and
is_subset(marqo_index.field_map, existing_index.field_map)):
is_subset(marqo_index.field_map, existing_index.field_map) and
is_subset(marqo_index.name_to_string_array_field_map, existing_index.name_to_string_array_field_map)):
logger.debug(f'Another thread has updated the index {marqo_index.name} already.')
return

Expand Down
35 changes: 33 additions & 2 deletions src/marqo/core/models/marqo_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,12 @@ def check_all_fields(cls, values):

return values

class StringArrayField(ImmutableStrictBaseModel):
name: str
type: FieldType
string_array_field_name: Optional[str]
features: List[FieldFeature] = []


class TensorField(ImmutableStrictBaseModel):
"""
Expand Down Expand Up @@ -505,6 +511,7 @@ class SemiStructuredMarqoIndex(UnstructuredMarqoIndex):
type: IndexType = IndexType.SemiStructured
lexical_fields: List[Field]
tensor_fields: List[TensorField]
string_array_fields: Optional[List[StringArrayField]] # This is required so that when saving a document containing string array fields, we can make changes to the schema on the fly. Ref: https://github.com/marqo-ai/marqo/blob/cfea70adea7039d1586c94e36adae8e66cabe306/src/marqo/core/semi_structured_vespa_index/semi_structured_vespa_schema_template_2_16.sd.jinja2#L83

def __init__(self, **data):
super().__init__(**data)
Expand All @@ -519,8 +526,32 @@ def field_map(self) -> Dict[str, Field]:
A map from field name to the field.
"""
return self._cache_or_get('field_map',
lambda: {field.name: field for field in self.lexical_fields}
)
lambda: {field.name: field for field in self.lexical_fields})

@property
def name_to_string_array_field_map(self):
"""
A map from a StringArrayField object's " "name" property to corresponding StringArrayField object.
"Name" is the name of the StringArrayField object, which is passed by the user. It does not start with marqo__string_array prefix.
Returns an empty dict if string_array_fields is None.
"""

return self._cache_or_get('name_to_string_array_field_map',
lambda : {} if self.string_array_fields is None
else {field.name: field for field in self.string_array_fields})

@property
def string_array_field_name_to_string_array_field_map(self):
"""
A map from a StringArrayField object's "string_array_field_name" property to corresponding StringArrayField object.
A "string_array_field_name" is that name of a StringArrayField object, which is used in the index schema, and it starts with marqo__string_array prefix.
Returns an empty dict if string_array_fields is None.
"""
return self._cache_or_get('string_array_field_map',
lambda : {} if self.string_array_fields is None
else {field.string_array_field_name: field for field in self.string_array_fields})

@property
def lexical_field_map(self) -> Dict[str, Field]:
Expand Down
10 changes: 9 additions & 1 deletion src/marqo/core/semi_structured_vespa_index/common.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import semver

VESPA_FIELD_ID = "marqo__id"
STRINGS = "marqo__strings"
SHORT_STRINGS_FIELDS = "marqo__short_string_fields"
Expand All @@ -20,6 +22,10 @@
MARQO_DOC_MULTIMODAL_PARAMS = "multimodal_params"
VESPA_DOC_MULTIMODAL_PARAMS = "marqo__multimodal_params"

# A metadata field that's used to store a dictionary of key-value pairs where key is the field name and value is a string denoting the field type
VESPA_DOC_FIELD_TYPES = "marqo__field_types"
VESPA_DOC_CREATE_TIMESTAMP = "marqo__create_timestamp"

SUMMARY_ALL_NON_VECTOR = 'all-non-vector-summary'
SUMMARY_ALL_VECTOR = 'all-vector-summary'

Expand Down Expand Up @@ -50,4 +56,6 @@
QUERY_INPUT_HYBRID_FIELDS_TO_RANK_TENSOR = "marqo__fields_to_rank_tensor"

VESPA_DOC_HYBRID_RAW_TENSOR_SCORE = 'marqo__raw_tensor_score'
VESPA_DOC_HYBRID_RAW_LEXICAL_SCORE = 'marqo__raw_lexical_score'
VESPA_DOC_HYBRID_RAW_LEXICAL_SCORE = 'marqo__raw_lexical_score'

SEMISTRUCTURED_INDEX_PARTIAL_UPDATE_SUPPORT_VERSION = semver.VersionInfo.parse("2.16.0") #Denotes the Marqo version from which partial update support for semi-structured index was added
15 changes: 15 additions & 0 deletions src/marqo/core/semi_structured_vespa_index/marqo_field_types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from enum import Enum


class MarqoFieldTypes(Enum):
"""
Enum class for Marqo field types. Used to specify the type of field in a Marqo index.
"""
BOOL = 'bool'
INT_MAP = 'int_map_entry'
FLOAT_MAP = 'float_map_entry'
INT = 'int'
FLOAT = 'float'
STRING_ARRAY = 'string_array'
STRING = 'string'
TENSOR = 'tensor'
Loading

0 comments on commit cc707da

Please sign in to comment.