Skip to content

Commit

Permalink
Disable auto create index (#516)
Browse files Browse the repository at this point in the history
* Remove auto index creation, fail instead

* Fix add_documents test

* Fix TestVectorSearch tests

* Fix TestAddDocumentsPara tests

* Fix TestIndexMetaCache and TestLexicalSearch tests

* Fix more tests

* Fix new test test_vector_text_search_no_device

* Fix tensor_search change lost in merge

* Update README for new index creation behaviour

* Add test for add documents when missing index

* Fix tests from merged from mainline

* Clean up test indices in base test class tear down

* Remove implicit index test -- not applicable any more

* Fix whitespace tes

---------

Co-authored-by: Farshid Zavareh <farshid@marqo.ai>
  • Loading branch information
farshidz and farshidz authored Jun 30, 2023
1 parent c2f8811 commit 5bcf3ae
Show file tree
Hide file tree
Showing 17 changed files with 194 additions and 182 deletions.
9 changes: 6 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ import marqo

mq = marqo.Client(url='http://localhost:8882')

mq.create_index(index_name="my-first-index")

mq.index("my-first-index").add_documents([
{
"Title": "The Travels of Marco Polo",
Expand All @@ -122,11 +124,10 @@ results = mq.index("my-first-index").search(

```

- `mq` is the client that wraps the `marqo` API
- `mq` is the client that wraps the `marqo` API.
- `create_index()` creates a new index with default settings.
- `add_documents()` takes a list of documents, represented as python dicts for indexing.
- `add_documents()` creates an index with default settings, if one does not already exist.
- You can optionally set a document's ID with the special `_id` field. Otherwise, Marqo will generate one.
- If the index doesn't exist, Marqo will create it. If it exists then Marqo will add the documents to the index.

Let's have a look at the results:

Expand Down Expand Up @@ -273,6 +274,8 @@ import pprint

mq = marqo.Client(url="http://localhost:8882")

mq.create_index("my-weighted-query-index")

mq.index("my-weighted-query-index").add_documents(
[
{
Expand Down
4 changes: 2 additions & 2 deletions src/marqo/tensor_search/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ def get_index_info(config: Config, index_name: str) -> IndexInfo:
IndexInfo of the index
Raises:
NonTensorIndexError, if the index's mapping doesn't conform to a Tensor Search index
NonTensorIndexError: If the index's mapping doesn't conform to a Tensor Search index.
IndexNotFoundError: If index does not exist.
"""
res = HttpRequests(config).get(path=F"{index_name}/_mapping")

Expand Down
52 changes: 24 additions & 28 deletions src/marqo/tensor_search/tensor_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,14 +228,6 @@ def get_stats(config: Config, index_name: str):
}


def _check_and_create_index_if_not_exist(config: Config, index_name: str):
try:
index_info = backend.get_index_info(config=config, index_name=index_name)
except errors.IndexNotFoundError as s:
create_vector_index(config=config, index_name=index_name)
index_info = backend.get_index_info(config=config, index_name=index_name)


def add_documents_orchestrator(
config: Config, add_docs_params: AddDocsParams,
batch_size: int = 0, processes: int = 1,
Expand All @@ -249,13 +241,18 @@ def add_documents_orchestrator(
logger.debug(f"No device given for add_documents_orchestrator. Defaulting to best available device: {selected_device}")
else:
add_docs_params_with_device = add_docs_params

if batch_size is None or batch_size == 0:
logger.debug(f"batch_size={batch_size} and processes={processes} - not doing any marqo side batching")
return add_documents(config=config, add_docs_params=add_docs_params_with_device)
elif processes is not None and processes > 1:
# create beforehand or pull from the cache so it is up to date for the multi-processing
_check_and_create_index_if_not_exist(config=config, index_name=add_docs_params.index_name)

# verify index exists and update cache
try:
backend.get_index_info(config=config, index_name=add_docs_params.index_name)
except errors.IndexNotFoundError:
raise errors.IndexNotFoundError(f"Cannot add documents to non-existent index {add_docs_params.index_name}")

try:
# Empty text search:
# 1. loads model into memory, 2. updates cache for multiprocessing
Expand Down Expand Up @@ -387,9 +384,8 @@ def add_documents(config: Config, add_docs_params: AddDocsParams):
raise errors.InternalError("add_documents (internal function) cannot be called without setting device!")
try:
index_info = backend.get_index_info(config=config, index_name=add_docs_params.index_name)
except errors.IndexNotFoundError as s:
create_vector_index(config=config, index_name=add_docs_params.index_name)
index_info = backend.get_index_info(config=config, index_name=add_docs_params.index_name)
except errors.IndexNotFoundError:
raise errors.IndexNotFoundError(f"Cannot add documents to non-existent index {add_docs_params.index_name}")

if len(add_docs_params.docs) == 0:
raise errors.BadRequestError(message="Received empty add documents request")
Expand Down Expand Up @@ -422,7 +418,7 @@ def add_documents(config: Config, add_docs_params: AddDocsParams):
image_repo = add_docs.download_images(docs=add_docs_params.docs, thread_count=20,
non_tensor_fields=tuple(add_docs_params.non_tensor_fields),
image_download_headers=add_docs_params.image_download_headers)

if add_docs_params.use_existing_tensors:
doc_ids = []

Expand Down Expand Up @@ -628,7 +624,7 @@ def add_documents(config: Config, add_docs_params: AddDocsParams):
TensorField.field_content: text_chunk,
TensorField.field_name: field
})

elif isinstance(field_content, dict):
if add_docs_params.mappings[field]["type"] == "multimodal_combination":
(combo_chunk, combo_document_is_valid,
Expand All @@ -647,7 +643,7 @@ def add_documents(config: Config, add_docs_params: AddDocsParams):
# TODO: we may want to use chunks_to_append here to make it uniform with use_existing_tensors and normal vectorisation
chunks.append({**combo_chunk, **chunk_values_for_filtering})
continue

# Add chunks_to_append along with doc metadata to total chunks
for chunk in chunks_to_append:
chunks.append({**chunk, **chunk_values_for_filtering})
Expand Down Expand Up @@ -1074,7 +1070,7 @@ def search(config: Config, index_name: str, text: Union[str, dict],
logger.debug(f"No device given for search. Defaulting to best available device: {selected_device}")
else:
selected_device = device

if search_method.upper() == SearchMethod.TENSOR:
search_result = _vector_text_search(
config=config, index_name=index_name, query=text, result_count=result_count, offset=offset,
Expand Down Expand Up @@ -1283,7 +1279,7 @@ def get_vector_properties_to_search(searchable_attributes: Union[None, List[str]
properties_to_search = list(searchable_attributes_as_vectors.intersection(
index_info.get_vector_properties().keys()
))

# Validation for offset (pagination is single field) if offset not provided, validation is not run.
if len(properties_to_search) != 1 and offset > 0:
human_readable_vector_properties = [v.replace(TensorField.vector_prefix, '') for v in
Expand All @@ -1309,8 +1305,8 @@ def construct_msearch_body_elements(searchableAttributes: List[str], offset: int
["knn"][f"{TensorField.chunks}.{vector_field}"][
"filter"] = {
"query_string": {"query": f"{contextualised_filter}"}
}
else:
}
else:
search_query = _create_normal_tensor_search_query(result_count, offset, vector_field, query_vector)
if filter_string is not None:
search_query["query"]["nested"]["query"]["knn"][f"{TensorField.chunks}.{vector_field}"][
Expand Down Expand Up @@ -1537,11 +1533,11 @@ def get_query_vectors_from_jobs(
]
# TODO how doe we ensure order?
weighted_vectors = [np.asarray(vec) * weight for vec, weight, content in vectorised_ordered_queries]
context_tensors = q.get_context_tensor()

context_tensors = q.get_context_tensor()
if context_tensors is not None:
weighted_vectors += [np.asarray(v.vector) * v.weight for v in context_tensors]

try:
merged_vector = np.mean(weighted_vectors, axis=0)
except ValueError as e:
Expand Down Expand Up @@ -1638,7 +1634,7 @@ def _bulk_vector_text_search(config: Config, queries: List[BulkSearchQueryEntity

if not device:
raise errors.InternalError("_bulk_vector_text_search cannot be called without `device`!")

with RequestMetricsStore.for_request().time("bulk_search.vector.processing_before_opensearch",
lambda t : logger.debug(f"bulk search (tensor) pre-processing: took {t:.3f}ms")
):
Expand All @@ -1661,7 +1657,7 @@ def _bulk_vector_text_search(config: Config, queries: List[BulkSearchQueryEntity
if not aggregate_body:
# Must return empty response, per search query
return create_empty_query_response(queries)

## 5. POST aggregate to /_msearch
responses = bulk_msearch(config, aggregate_body)

Expand Down Expand Up @@ -1748,7 +1744,7 @@ def _vector_text_search(
# # SEARCH TIMER-LOGGER (pre-processing)
if not device:
raise errors.InternalError("_vector_text_search cannot be called without `device`!")

RequestMetricsStore.for_request().start("search.vector.processing_before_opensearch")

try:
Expand Down Expand Up @@ -1801,7 +1797,7 @@ def _vector_text_search(
gathered_docs = boost_score(gathered_docs, boost, searchable_attributes)

completely_sorted = sort_chunks(gathered_docs)

if verbose:
print("Chunk vector search, sorted result:")
if verbose == 1:
Expand Down
10 changes: 10 additions & 0 deletions tests/marqo_test.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
import unittest

from marqo.errors import IndexNotFoundError
from marqo.tensor_search import tensor_search
from marqo.tensor_search.telemetry import RequestMetricsStore
from marqo.tensor_search.utils import construct_authorized_url
from marqo import config
Expand All @@ -21,6 +24,13 @@ def configure_request_metrics(cls):
def tearDownClass(cls):
cls.patcher.stop()

# Delete any test indices my-test-index-i (up to 5)
for ix_name in [f"my-test-index-{i}" for i in range(1, 6)]:
try:
tensor_search.delete_index(config=cls.config, index_name=ix_name)
except IndexNotFoundError:
pass

@classmethod
def setUpClass(cls) -> None:
cls.configure_request_metrics()
Expand Down
Loading

0 comments on commit 5bcf3ae

Please sign in to comment.