Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dupe IDs are handled when use_existing_tensors=True #390

Merged
merged 8 commits into from
Mar 16, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 15 additions & 10 deletions src/marqo/tensor_search/tensor_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,7 @@ def _infer_opensearch_data_type(

def _get_chunks_for_field(field_name: str, doc_id: str, doc):
# Find the chunks with a specific __field_name in a doc
# Note: for a chunkless doc (nothing was tensorised) --> doc["_source"]["__chunks"] == []
return [chunk for chunk in doc["_source"]["__chunks"] if chunk["__field_name"] == field_name]


Expand Down Expand Up @@ -547,7 +548,6 @@ def add_documents(config: Config, index_name: str, docs: List[dict], auto_refres
# Check if content of this field changed. If no, skip all chunking and vectorisation
if ((update_mode == 'replace') and use_existing_tensors and existing_doc["found"]
and (field in existing_doc["_source"]) and (existing_doc["_source"][field] == field_content)):
# logger.info(f"Using existing vectors for doc {doc_id}, field {field}. Content remains unchanged.")
chunks_to_append = _get_chunks_for_field(field_name=field, doc_id=doc_id, doc=existing_doc)

# Chunk and vectorise, since content changed.
Expand Down Expand Up @@ -657,9 +657,7 @@ def add_documents(config: Config, index_name: str, docs: List[dict], auto_refres
TensorField.field_content: text_chunk,
TensorField.field_name: field
})

# Add chunks_to_append along with doc metadata to total chunks


elif isinstance(field_content, dict):
if mappings[field]["type"]=="multimodal_combination":
combo_chunk, combo_document_is_valid, unsuccessful_doc_to_append, combo_vectorise_time_to_add,\
Expand All @@ -675,9 +673,11 @@ def add_documents(config: Config, index_name: str, docs: List[dict], auto_refres
if field not in new_obj_fields:
new_obj_fields[field] = set()
new_obj_fields[field] = new_obj_fields[field].union(new_fields_from_multimodal_combination)
# TODO: we may want to use chunks_to_append here to make it uniform with use_existing_tensors and normal vectorisation
chunks.append({**combo_chunk, **chunk_values_for_filtering})
continue


# Add chunks_to_append along with doc metadata to total chunks
for chunk in chunks_to_append:
chunks.append({**chunk, **chunk_values_for_filtering})

Expand Down Expand Up @@ -899,6 +899,7 @@ def _get_documents_for_upsert(
f"set by the environment variable `{EnvVars.MARQO_MAX_RETRIEVABLE_DOCS}`")

# Chunk Docs (get field name, field content, vectors)

chunk_docs = [
{"_index": index_name, "_id": doc_id,
"_source": {"include": [f"__chunks.__field_content", f"__chunks.__field_name", f"__chunks.__vector_*"]}}
Expand All @@ -919,9 +920,11 @@ def _get_documents_for_upsert(

# Combine the 2 query results (loop through each doc id)
combined_result = []
for doc_id in document_ids:

for doc_id in valid_doc_ids:
# There should always be 2 results per doc.
result_list = [doc for doc in res["docs"] if doc["_id"] == doc_id]

if len(result_list) == 0:
continue
if len(result_list) not in (2, 0):
Expand All @@ -931,7 +934,7 @@ def _get_documents_for_upsert(
for result in result_list:
if result["found"]:
doc_in_results = True
if result["_source"]["__chunks"] == []:
if ("__chunks" in result["_source"]) and (result["_source"]["__chunks"] == []):
res_data = result
else:
res_chunks = result
Expand All @@ -940,12 +943,14 @@ def _get_documents_for_upsert(
dummy_res = result
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: may be more appropriate to call this something like not_found_res

break

# Put the chunks list in res_data, so it's complete
# Put the chunks list in res_data, so it contains all doc data
if doc_in_results:
res_data["_source"]["__chunks"] = res_chunks["_source"]["__chunks"]
# Only add chunks if not a chunkless doc
if res_chunks["_source"]:
res_data["_source"]["__chunks"] = res_chunks["_source"]["__chunks"]
combined_result.append(res_data)
else:
# This result just says that the doc was not found
# This result just says that the doc was not found ("found": False)
combined_result.append(dummy_res)

res["docs"] = combined_result
Expand Down
63 changes: 60 additions & 3 deletions tests/tensor_search/test_add_documents.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ def test_add_plain_id_field(self):

def test_add_documents_dupe_ids(self):
"""
TODO
Should only use the latest inserted ID. Make sure it doesn't get the first/middle one
"""

Expand Down Expand Up @@ -305,16 +304,74 @@ def test_add_documents_validation(self):
{"_id": "to_fail_567", "some other obj": AssertionError}],
[{"_id": "to_fail_567", "blahblah": max}]
]
for update_mode in ('replace', 'update'):

# For update
for bad_doc_arg in bad_doc_args:
add_res = tensor_search.add_documents(
config=self.config, index_name=self.index_name_1,
docs=bad_doc_arg, auto_refresh=True, update_mode='update')
assert add_res['errors'] is True
assert all(['error' in item for item in add_res['items'] if item['_id'].startswith('to_fail')])
assert all(['result' in item
for item in add_res['items'] if item['_id'].startswith('to_pass')])

# For replace, check with use_existing_tensors True and False
for use_existing_tensors_flag in (True, False):
for bad_doc_arg in bad_doc_args:
add_res = tensor_search.add_documents(
config=self.config, index_name=self.index_name_1,
docs=bad_doc_arg, auto_refresh=True, update_mode=update_mode)
docs=bad_doc_arg, auto_refresh=True, update_mode='replace', use_existing_tensors=use_existing_tensors_flag)
assert add_res['errors'] is True
assert all(['error' in item for item in add_res['items'] if item['_id'].startswith('to_fail')])
assert all(['result' in item
for item in add_res['items'] if item['_id'].startswith('to_pass')])


def test_add_documents_id_validation(self):
"""These bad docs should return errors"""
bad_doc_args = [
# Wrong data types for ID
# Tuple: (doc_list, number of docs that should succeed)
([{"_id": {}, "field_1": 1234}], 0),
([{"_id": dict(), "field_1": 1234}], 0),
([{"_id": [1, 2, 3], "field_1": 1234}], 0),
([{"_id": 4, "field_1": 1234}], 0),
([{"_id": None, "field_1": 1234}], 0),

([{"_id": "proper id", "field_1": 5678},
{"_id": ["bad", "id"], "field_1": "zzz"},
{"_id": "proper id 2", "field_1": 90}], 2)
]

# For update
for bad_doc_arg in bad_doc_args:
add_res = tensor_search.add_documents(
config=self.config, index_name=self.index_name_1,
docs=bad_doc_arg[0], auto_refresh=True, update_mode='update')

assert add_res['errors'] is True

succeeded_count = 0
for item in add_res['items']:
if 'result' in item:
succeeded_count += 1

assert succeeded_count == bad_doc_arg[1]

# For replace, check with use_existing_tensors True and False
for use_existing_tensors_flag in (True, False):
for bad_doc_arg in bad_doc_args:
add_res = tensor_search.add_documents(
config=self.config, index_name=self.index_name_1,
docs=bad_doc_arg[0], auto_refresh=True, update_mode='replace', use_existing_tensors=use_existing_tensors_flag)
assert add_res['errors'] is True
succeeded_count = 0
for item in add_res['items']:
if 'result' in item:
succeeded_count += 1

assert succeeded_count == bad_doc_arg[1]

def test_add_documents_list_non_tensor_validation(self):
"""This doc is valid but should return error because my_field is not marked non-tensor"""
bad_doc_args = [
Expand Down
99 changes: 91 additions & 8 deletions tests/tensor_search/test_add_documents_use_existing_tensors.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,6 @@ def test_use_existing_tensors_non_existing(self):
document_id="123", show_vectors=True)
self.assertEqual(use_existing_tensors_doc, regular_doc)

tensor_search.delete_index(config=self.config, index_name=self.index_name_1)

tensor_search.add_documents(config=self.config, index_name=self.index_name_1, docs=[
{
"_id": "123",
Expand All @@ -102,7 +100,6 @@ def test_use_existing_tensors_non_existing(self):

def test_use_existing_tensors_dupe_ids(self):
"""
TODO
Should only use the latest inserted ID. Make sure it doesn't get the first/middle one
"""

Expand Down Expand Up @@ -145,7 +142,6 @@ def test_use_existing_tensors_dupe_ids(self):

self.assertEqual(doc_3_solo, doc_3_duped)

tensor_search.delete_index(config=self.config, index_name=self.index_name_1)
tensor_search.add_documents(config=self.config, index_name=self.index_name_1, docs=[
{
"_id": "1",
Expand Down Expand Up @@ -173,12 +169,40 @@ def test_use_existing_tensors_dupe_ids(self):
# Needs to be 3b, not 3a
self.assertEqual(doc_3_duped, doc_3_overwritten)

def test_use_existing_tensors_untensorize_something(self):
def test_use_existing_tensors_retensorize_fields(self):
"""
TODO
During the initial index, one field is a tensor field
When we insert the doc again, with use_existing_tensors, we make it a non-tensor-field
During the initial index, some fields are non-tensor fields
When we insert the doc again, with use_existing_tensors, we make them tensor fields.
They should still have no tensors.
"""

tensor_search.add_documents(config=self.config, index_name=self.index_name_1, docs=[
{
"_id": "123",
"title 1": "content 1",
"title 2": 2,
"title 3": True,
"title 4": "content 4"
}], auto_refresh=True, use_existing_tensors=True,
non_tensor_fields=["title 1", "title 2", "title 3", "title 4"])
d1 = tensor_search.get_document_by_id(
config=self.config, index_name=self.index_name_1,
document_id="123", show_vectors=True)
assert len(d1["_tensor_facets"]) == 0

tensor_search.add_documents(config=self.config, index_name=self.index_name_1, docs=[
{
"_id": "123",
"title 1": "content 1",
"title 2": 2,
"title 3": True,
"title 4": "content 4"
}], auto_refresh=True, use_existing_tensors=True)
d2 = tensor_search.get_document_by_id(
config=self.config, index_name=self.index_name_1,
document_id="123", show_vectors=True)

assert len(d2["_tensor_facets"]) == 0

def test_use_existing_tensors_getting_non_tensorised(self):
"""
Expand Down Expand Up @@ -209,6 +233,27 @@ def test_use_existing_tensors_getting_non_tensorised(self):
document_id="123", show_vectors=True)
self.assertEqual(d1["_tensor_facets"], d2["_tensor_facets"])

# The only field is a non-tensor field. This makes a chunkless doc.
tensor_search.add_documents(config=self.config, index_name=self.index_name_1, docs=[
{
"_id": "999",
"non-tensor-field": "content 2. blah blah blah"
}], auto_refresh=True, non_tensor_fields=["non-tensor-field"])
d1 = tensor_search.get_document_by_id(
config=self.config, index_name=self.index_name_1,
document_id="999", show_vectors=True)
assert len(d1["_tensor_facets"]) == 0

tensor_search.add_documents(config=self.config, index_name=self.index_name_1, docs=[
{
"_id": "999",
"non-tensor-field": "content 2. blah blah blah"
}], auto_refresh=True, use_existing_tensors=True)
d2 = tensor_search.get_document_by_id(
config=self.config, index_name=self.index_name_1,
document_id="999", show_vectors=True)
self.assertEqual(d1["_tensor_facets"], d2["_tensor_facets"])

def test_use_existing_tensors_check_updates(self):
""" Check to see if the document has been appropriately updated
"""
Expand Down Expand Up @@ -452,3 +497,41 @@ def run():
return True

assert run()


def test_use_existing_tensors_all_data_types(self):
"""
Ensure no errors occur even with chunkless docs. (only int, only bool, etc)
Replacing doc doesn't change the content
"""
doc_args = [
# Each doc only has 1 type
[{"_id": "1", "title": "hello world"},
{"_id": "2", "title": True},
{"_id": "3", "title": 12345},
{"_id": "4", "title": [1, 2, 3]}],

# Doc with all types
[{"_id": "1", "field1": "hello world", "field2": True, "field3": 12345, "field4": [1, 2, 3]}]
]

for doc_arg in doc_args:
# Add doc normally without use_existing_tensors
add_res = tensor_search.add_documents(
config=self.config, index_name=self.index_name_1,
docs=doc_arg, auto_refresh=True, update_mode='replace')

d1 = tensor_search.get_documents_by_ids(
config=self.config, index_name=self.index_name_1,
document_ids=[doc["_id" ]for doc in doc_arg], show_vectors=True)

# Then replace doc with use_existing_tensors
add_res = tensor_search.add_documents(
config=self.config, index_name=self.index_name_1,
docs=doc_arg, auto_refresh=True, update_mode='replace', use_existing_tensors=True)

d2 = tensor_search.get_documents_by_ids(
config=self.config, index_name=self.index_name_1,
document_ids=[doc["_id" ]for doc in doc_arg], show_vectors=True)

self.assertEqual(d1, d2)