Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove Parallel (Processes) in Marqo #523

Merged
merged 10 commits into from
Jun 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions src/marqo/tensor_search/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,6 @@ def add_or_replace_documents(
refresh: bool = True,
marqo_config: config.Config = Depends(generate_config),
batch_size: int = 0,
processes: int = 1,
non_tensor_fields: List[str] = Query(default=[]),
device: str = Depends(api_validation.validate_device),
use_existing_tensors: bool = False,
Expand All @@ -194,7 +193,7 @@ def add_or_replace_documents(
with RequestMetricsStore.for_request().time(f"POST /indexes/{index_name}/documents"):
return tensor_search.add_documents_orchestrator(
config=marqo_config, add_docs_params=add_docs_params,
batch_size=batch_size, processes=processes
batch_size=batch_size
)


Expand Down
245 changes: 0 additions & 245 deletions src/marqo/tensor_search/parallel.py

This file was deleted.

52 changes: 9 additions & 43 deletions src/marqo/tensor_search/tensor_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
EnvVars
)
from marqo.tensor_search.enums import IndexSettingsField as NsField
from marqo.tensor_search import utils, backend, validation, configs, parallel, add_docs
from marqo.tensor_search import utils, backend, validation, configs, add_docs
from marqo.tensor_search.formatting import _clean_doc
from marqo.tensor_search.index_meta_cache import get_cache, get_index_info
from marqo.tensor_search import index_meta_cache
Expand Down Expand Up @@ -230,8 +230,7 @@ def get_stats(config: Config, index_name: str):

def add_documents_orchestrator(
config: Config, add_docs_params: AddDocsParams,
batch_size: int = 0, processes: int = 1,
):
batch_size: int = 0):
# Default device calculated here and not in add_documents call
if add_docs_params.device is None:
selected_device = utils.read_env_vars_and_defaults("MARQO_BEST_AVAILABLE_DEVICE")
Expand All @@ -241,47 +240,14 @@ def add_documents_orchestrator(
logger.debug(f"No device given for add_documents_orchestrator. Defaulting to best available device: {selected_device}")
else:
add_docs_params_with_device = add_docs_params

if batch_size is None or batch_size == 0:
logger.debug(f"batch_size={batch_size} and processes={processes} - not doing any marqo side batching")
logger.debug(f"batch_size={batch_size} - not doing any marqo side batching")
return add_documents(config=config, add_docs_params=add_docs_params_with_device)
elif processes is not None and processes > 1:

# verify index exists and update cache
try:
backend.get_index_info(config=config, index_name=add_docs_params.index_name)
except errors.IndexNotFoundError:
raise errors.IndexNotFoundError(f"Cannot add documents to non-existent index {add_docs_params.index_name}")

try:
# Empty text search:
# 1. loads model into memory, 2. updates cache for multiprocessing
_vector_text_search(
config=config, index_name=add_docs_params.index_name, query='',
model_auth=add_docs_params.model_auth, device=add_docs_params_with_device.device,
image_download_headers=add_docs_params.image_download_headers)
except Exception as e:
logger.warning(
f"add_documents orchestrator's call to vector text search, prior to parallel add_docs, raised an error. "
f"Continuing to parallel add_docs. "
f"Message: {e}"
)

logger.debug(f"batch_size={batch_size} and processes={processes} - using multi-processing")
results = parallel.add_documents_mp(
config=config, batch_size=batch_size, processes=processes, add_docs_params=add_docs_params_with_device
)
# we need to force the cache to update as it does not propagate using mp
# we just clear this index's entry and it will re-populate when needed next
if add_docs_params.index_name in get_cache():
logger.info(f'deleting cache entry for {add_docs_params.index_name} after parallel add documents')
del get_cache()[add_docs_params.index_name]

return results
else:
if batch_size < 0:
raise errors.InvalidArgError("Batch size can't be less than 1!")
logger.debug(f"batch_size={batch_size} and processes={processes} - batching using a single process")
logger.debug(f"batch_size={batch_size} - batching inside marqo")
return _batch_request(config=config, verbose=False, add_docs_params=add_docs_params_with_device, batch_size=batch_size)


Expand Down Expand Up @@ -643,7 +609,7 @@ def add_documents(config: Config, add_docs_params: AddDocsParams):
# TODO: we may want to use chunks_to_append here to make it uniform with use_existing_tensors and normal vectorisation
chunks.append({**combo_chunk, **chunk_values_for_filtering})
continue

# Add chunks_to_append along with doc metadata to total chunks
for chunk in chunks_to_append:
chunks.append({**chunk, **chunk_values_for_filtering})
Expand Down Expand Up @@ -1279,7 +1245,7 @@ def get_vector_properties_to_search(searchable_attributes: Union[None, List[str]
properties_to_search = list(searchable_attributes_as_vectors.intersection(
index_info.get_vector_properties().keys()
))

# Validation for offset (pagination is single field) if offset not provided, validation is not run.
if len(properties_to_search) != 1 and offset > 0:
human_readable_vector_properties = [v.replace(TensorField.vector_prefix, '') for v in
Expand Down Expand Up @@ -1634,7 +1600,7 @@ def _bulk_vector_text_search(config: Config, queries: List[BulkSearchQueryEntity

if not device:
raise errors.InternalError("_bulk_vector_text_search cannot be called without `device`!")

with RequestMetricsStore.for_request().time("bulk_search.vector.processing_before_opensearch",
lambda t : logger.debug(f"bulk search (tensor) pre-processing: took {t:.3f}ms")
):
Expand All @@ -1657,7 +1623,7 @@ def _bulk_vector_text_search(config: Config, queries: List[BulkSearchQueryEntity
if not aggregate_body:
# Must return empty response, per search query
return create_empty_query_response(queries)

## 5. POST aggregate to /_msearch
responses = bulk_msearch(config, aggregate_body)

Expand Down
4 changes: 2 additions & 2 deletions tests/tensor_search/test_add_documents.py
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,7 @@ def test_add_documents_list_data_type_validation(self):
assert add_res['errors'] is True
assert all(['error' in item for item in add_res['items'] if item['_id'].startswith('to_fail')])

def test_add_documents_orchestrator_set_device_single_process(self):
def test_add_documents_orchestrator_set_device(self):
mock_config = copy.deepcopy(self.config)

mock_vectorise = mock.MagicMock()
Expand All @@ -467,7 +467,7 @@ def run():
index_name=self.index_name_1, device="cuda:22", docs=[{"some": "doc"}, {"som other": "doc"}],
auto_refresh=True,
),
batch_size=1, processes=1
batch_size=1
)
return True

Expand Down
2 changes: 1 addition & 1 deletion tests/tensor_search/test_bulk_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -1422,7 +1422,7 @@ def test_limit_results_none(self):
tensor_search.add_documents_orchestrator(
config=self.config, add_docs_params=AddDocsParams(index_name=self.index_name_1,
docs=[{"Title": "a " + (" ".join(random.choices(population=vocab, k=25)))}
for _ in range(700)], auto_refresh=False), processes=4, batch_size=50
for _ in range(700)], auto_refresh=False), batch_size=50
)
tensor_search.refresh_index(config=self.config, index_name=self.index_name_1)

Expand Down
Loading