Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Disable auto create index #516

Merged
merged 18 commits into from
Jun 30, 2023
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ import marqo

mq = marqo.Client(url='http://localhost:8882')

mq.create_index(index_name="my-first-index")

mq.index("my-first-index").add_documents([
{
"Title": "The Travels of Marco Polo",
Expand All @@ -122,11 +124,10 @@ results = mq.index("my-first-index").search(

```

- `mq` is the client that wraps the `marqo` API
- `mq` is the client that wraps the `marqo` API.
- `create_index()` creates a new index with default settings.
- `add_documents()` takes a list of documents, represented as python dicts for indexing.
- `add_documents()` creates an index with default settings, if one does not already exist.
- You can optionally set a document's ID with the special `_id` field. Otherwise, Marqo will generate one.
- If the index doesn't exist, Marqo will create it. If it exists then Marqo will add the documents to the index.

Let's have a look at the results:

Expand Down Expand Up @@ -273,6 +274,8 @@ import pprint

mq = marqo.Client(url="http://localhost:8882")

mq.create_index("my-weighted-query-index")

mq.index("my-weighted-query-index").add_documents(
[
{
Expand Down
4 changes: 2 additions & 2 deletions src/marqo/tensor_search/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ def get_index_info(config: Config, index_name: str) -> IndexInfo:
IndexInfo of the index

Raises:
NonTensorIndexError, if the index's mapping doesn't conform to a Tensor Search index

NonTensorIndexError: If the index's mapping doesn't conform to a Tensor Search index.
IndexNotFoundError: If index does not exist.
"""
res = HttpRequests(config).get(path=F"{index_name}/_mapping")

Expand Down
52 changes: 24 additions & 28 deletions src/marqo/tensor_search/tensor_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,14 +228,6 @@ def get_stats(config: Config, index_name: str):
}


def _check_and_create_index_if_not_exist(config: Config, index_name: str):
try:
index_info = backend.get_index_info(config=config, index_name=index_name)
except errors.IndexNotFoundError as s:
create_vector_index(config=config, index_name=index_name)
index_info = backend.get_index_info(config=config, index_name=index_name)


def add_documents_orchestrator(
config: Config, add_docs_params: AddDocsParams,
batch_size: int = 0, processes: int = 1,
Expand All @@ -249,13 +241,18 @@ def add_documents_orchestrator(
logger.debug(f"No device given for add_documents_orchestrator. Defaulting to best available device: {selected_device}")
else:
add_docs_params_with_device = add_docs_params

if batch_size is None or batch_size == 0:
logger.debug(f"batch_size={batch_size} and processes={processes} - not doing any marqo side batching")
return add_documents(config=config, add_docs_params=add_docs_params_with_device)
elif processes is not None and processes > 1:
# create beforehand or pull from the cache so it is up to date for the multi-processing
_check_and_create_index_if_not_exist(config=config, index_name=add_docs_params.index_name)

# verify index exists and update cache
try:
backend.get_index_info(config=config, index_name=add_docs_params.index_name)
except errors.IndexNotFoundError:
raise errors.IndexNotFoundError(f"Cannot add documents to non-existent index {add_docs_params.index_name}")

try:
# Empty text search:
# 1. loads model into memory, 2. updates cache for multiprocessing
Expand Down Expand Up @@ -387,9 +384,8 @@ def add_documents(config: Config, add_docs_params: AddDocsParams):
raise errors.InternalError("add_documents (internal function) cannot be called without setting device!")
try:
index_info = backend.get_index_info(config=config, index_name=add_docs_params.index_name)
except errors.IndexNotFoundError as s:
create_vector_index(config=config, index_name=add_docs_params.index_name)
index_info = backend.get_index_info(config=config, index_name=add_docs_params.index_name)
except errors.IndexNotFoundError:
raise errors.IndexNotFoundError(f"Cannot add documents to non-existent index {add_docs_params.index_name}")

if len(add_docs_params.docs) == 0:
raise errors.BadRequestError(message="Received empty add documents request")
Expand Down Expand Up @@ -430,7 +426,7 @@ def add_documents(config: Config, add_docs_params: AddDocsParams):
image_repo = add_docs.download_images(docs=add_docs_params.docs, thread_count=20,
non_tensor_fields=tuple(add_docs_params.non_tensor_fields),
image_download_headers=add_docs_params.image_download_headers)

if add_docs_params.update_mode == 'replace' and add_docs_params.use_existing_tensors:
doc_ids = []

Expand Down Expand Up @@ -640,7 +636,7 @@ def add_documents(config: Config, add_docs_params: AddDocsParams):
TensorField.field_content: text_chunk,
TensorField.field_name: field
})

elif isinstance(field_content, dict):
if add_docs_params.mappings[field]["type"] == "multimodal_combination":
(combo_chunk, combo_document_is_valid,
Expand All @@ -659,7 +655,7 @@ def add_documents(config: Config, add_docs_params: AddDocsParams):
# TODO: we may want to use chunks_to_append here to make it uniform with use_existing_tensors and normal vectorisation
chunks.append({**combo_chunk, **chunk_values_for_filtering})
continue

# Add chunks_to_append along with doc metadata to total chunks
for chunk in chunks_to_append:
chunks.append({**chunk, **chunk_values_for_filtering})
Expand Down Expand Up @@ -1145,7 +1141,7 @@ def search(config: Config, index_name: str, text: Union[str, dict],
logger.debug(f"No device given for search. Defaulting to best available device: {selected_device}")
else:
selected_device = device

if search_method.upper() == SearchMethod.TENSOR:
search_result = _vector_text_search(
config=config, index_name=index_name, query=text, result_count=result_count, offset=offset,
Expand Down Expand Up @@ -1354,7 +1350,7 @@ def get_vector_properties_to_search(searchable_attributes: Union[None, List[str]
properties_to_search = list(searchable_attributes_as_vectors.intersection(
index_info.get_vector_properties().keys()
))

# Validation for offset (pagination is single field) if offset not provided, validation is not run.
if len(properties_to_search) != 1 and offset > 0:
human_readable_vector_properties = [v.replace(TensorField.vector_prefix, '') for v in
Expand All @@ -1380,8 +1376,8 @@ def construct_msearch_body_elements(searchableAttributes: List[str], offset: int
["knn"][f"{TensorField.chunks}.{vector_field}"][
"filter"] = {
"query_string": {"query": f"{contextualised_filter}"}
}
else:
}
else:
search_query = _create_normal_tensor_search_query(result_count, offset, vector_field, query_vector)
if filter_string is not None:
search_query["query"]["nested"]["query"]["knn"][f"{TensorField.chunks}.{vector_field}"][
Expand Down Expand Up @@ -1608,11 +1604,11 @@ def get_query_vectors_from_jobs(
]
# TODO how doe we ensure order?
weighted_vectors = [np.asarray(vec) * weight for vec, weight, content in vectorised_ordered_queries]
context_tensors = q.get_context_tensor()

context_tensors = q.get_context_tensor()
if context_tensors is not None:
weighted_vectors += [np.asarray(v.vector) * v.weight for v in context_tensors]

try:
merged_vector = np.mean(weighted_vectors, axis=0)
except ValueError as e:
Expand Down Expand Up @@ -1709,7 +1705,7 @@ def _bulk_vector_text_search(config: Config, queries: List[BulkSearchQueryEntity

if not device:
raise errors.InternalError("_bulk_vector_text_search cannot be called without `device`!")

with RequestMetricsStore.for_request().time("bulk_search.vector.processing_before_opensearch",
lambda t : logger.debug(f"bulk search (tensor) pre-processing: took {t:.3f}ms")
):
Expand All @@ -1732,7 +1728,7 @@ def _bulk_vector_text_search(config: Config, queries: List[BulkSearchQueryEntity
if not aggregate_body:
# Must return empty response, per search query
return create_empty_query_response(queries)

## 5. POST aggregate to /_msearch
responses = bulk_msearch(config, aggregate_body)

Expand Down Expand Up @@ -1819,7 +1815,7 @@ def _vector_text_search(
# # SEARCH TIMER-LOGGER (pre-processing)
if not device:
raise errors.InternalError("_vector_text_search cannot be called without `device`!")

RequestMetricsStore.for_request().start("search.vector.processing_before_opensearch")

try:
Expand Down Expand Up @@ -1872,7 +1868,7 @@ def _vector_text_search(
gathered_docs = boost_score(gathered_docs, boost, searchable_attributes)

completely_sorted = sort_chunks(gathered_docs)

if verbose:
print("Chunk vector search, sorted result:")
if verbose == 1:
Expand Down
Loading