Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deletion optimisation #436

Merged
merged 19 commits into from
Apr 20, 2023
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/marqo/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ def __init__(
url: str,
timeout: Optional[int] = None,
indexing_device: Optional[Union[enums.Device, str]] = None,
search_device: Optional[Union[enums.Device, str]] = None
search_device: Optional[Union[enums.Device, str]] = None,
backend: Optional[Union[enums.SearchDb, str]] = None,
) -> None:
"""
Parameters
Expand All @@ -23,6 +24,7 @@ def __init__(

self.indexing_device = indexing_device if indexing_device is not None else default_device
self.search_device = search_device if search_device is not None else default_device
self.backend = backend if backend is not None else enums.SearchDb.opensearch

def set_url(self, url):
"""Set the URL, and infers whether that url is remote"""
Expand Down
8 changes: 4 additions & 4 deletions src/marqo/tensor_search/api.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
"""The API entrypoint for Tensor Search"""
import typing
from fastapi.responses import JSONResponse
from fastapi import FastAPI, Request, Depends, HTTPException
from fastapi.exceptions import RequestValidationError
from fastapi import Request, Depends
import marqo.tensor_search.delete_docs
import marqo.tensor_search.tensor_search
from marqo.errors import InvalidArgError, MarqoWebError, MarqoError
from fastapi import FastAPI, Query
import json
Expand All @@ -12,7 +13,6 @@
import os
from marqo.tensor_search.models.api_models import BulkSearchQuery, SearchQuery
from marqo.tensor_search.web import api_validation, api_utils
from marqo.tensor_search import utils
from marqo.tensor_search.on_start_script import on_start
from marqo import version
from marqo.tensor_search.backend import get_index_info
Expand Down Expand Up @@ -237,7 +237,7 @@ def delete_index(index_name: str, marqo_config: config.Config = Depends(generate
@app.post("/indexes/{index_name}/documents/delete-batch")
def delete_docs(index_name: str, documentIds: List[str], refresh: bool = True,
marqo_config: config.Config = Depends(generate_config)):
return tensor_search.delete_documents(
return marqo.tensor_search.tensor_search.delete_documents(
index_name=index_name, config=marqo_config, doc_ids=documentIds,
auto_refresh=refresh
)
Expand Down
96 changes: 96 additions & 0 deletions src/marqo/tensor_search/delete_docs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
"""
This module handles the delete documents endpoint
"""
import datetime
from typing import List, NamedTuple, Literal
import json
from marqo import errors
from marqo._httprequests import HttpRequests
from marqo.config import Config
from marqo.tensor_search import validation, utils, enums


# -- Marqo delete endpoint interface: --


class MqDeleteDocsResponse(NamedTuple):
"""An object that holds the data we send back to users"""
index_name: str
status_string: Literal["succeeded"]
document_ids: List[str]
deleted_docments_count: int
deletion_start: datetime.datetime
deletion_end: datetime.datetime


def format_delete_docs_response(marqo_response: MqDeleteDocsResponse) -> dict:
"""This formats the delete response for users """
return {
"index_name": marqo_response.index_name, "status": marqo_response.status_string,
"type": "documentDeletion", "details": {
"receivedDocumentIds": len(marqo_response.document_ids),
"deletedDocuments": marqo_response.deleted_docments_count,
},
"duration": utils.create_duration_string(marqo_response.deletion_end - marqo_response.deletion_start),
"startedAt": utils.format_timestamp(marqo_response.deletion_start),
"finishedAt": utils.format_timestamp(marqo_response.deletion_end),
}


class MqDeleteDocsRequest(NamedTuple):
"""An object that holds the data from users for a delete request"""
index_name: str
document_ids: List[str]
auto_refresh: bool


# -- Data-layer agnostic logic --


def delete_documents(config: Config, del_request: MqDeleteDocsRequest) -> dict:
"""entrypoint function for deleting documents"""
if not del_request.document_ids:
raise errors.InvalidDocumentIdError("doc_ids can't be empty!")

for _id in del_request.document_ids:
validation.validate_id(_id)

if config.backend == enums.SearchDb.opensearch:
del_response: MqDeleteDocsResponse = delete_documents_marqo_os(config=config, deletion_instruction=del_request)
else:
raise RuntimeError(f"Config set to use unknown backend `{config.backend}`. "
f"See tensor_search.enums.SearchDB for allowed backends")

return format_delete_docs_response(del_response)


# -- Marqo-OS-specific deletion implementation: --


def delete_documents_marqo_os(config: Config, deletion_instruction: MqDeleteDocsRequest) -> MqDeleteDocsResponse:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Be careful with unbounded size on deletion_instruction.document_ids. There are limits to the size of HTTP requests. This is usually not a problem, but if you are sending millions of operations this can add up quickly.

Some other feedback.

  • Consider chunking requests into, say, 10,000 operations and then sending these requests in parallel
  • You should:
    • potentially attempt retries, assuming request is retryable based on response HTTP code (e.g. 429)
    • manage back-offs
    • send multiple requests in parallel for higher throughput
  • See here lookup "BulkAllObservable helper" heading for inspiration
  • Inspect the bulk response(s) to determine they have been successful
  • Consider disabling/changing the refresh interval around your bulk request (ensure it is correctly set back!)

"""Deletes documents """

# Prepare bulk delete request body
bulk_request_body = ""
for doc_id in deletion_instruction.document_ids:
bulk_request_body += json.dumps({"delete": {"_index": deletion_instruction.index_name, "_id": doc_id}}) + "\n"

# Send bulk delete request
t0 = datetime.datetime.utcnow()
delete_res_backend = HttpRequests(config=config).post(
path="_bulk",
body=bulk_request_body,
)

if deletion_instruction.auto_refresh:
refresh_response = HttpRequests(config).post(path=f"{deletion_instruction.index_name}/_refresh")

t1 = datetime.datetime.utcnow()
deleted_documents_count = sum(1 for item in delete_res_backend["items"] if "delete" in item and item["delete"]["status"] == 200)

mq_delete_res = MqDeleteDocsResponse(
index_name=deletion_instruction.index_name, status_string='succeeded', document_ids=deletion_instruction.document_ids,
deleted_docments_count=deleted_documents_count, deletion_start=t0,
deletion_end=t1
)
return mq_delete_res
3 changes: 3 additions & 0 deletions src/marqo/tensor_search/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,9 @@ class MappingsObjectType:
multimodal_combination = "multimodal_combination"


class SearchDb:
opensearch = 'opensearch'

# Perhaps create a ThrottleType to differentiate thread_count and data_size throttling mechanisms


Expand Down
52 changes: 14 additions & 38 deletions src/marqo/tensor_search/tensor_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,13 @@
"""
import copy
import json
import datetime
from collections import defaultdict
from timeit import default_timer as timer
import functools
import pprint
import typing
import uuid
from typing import List, Optional, Union, Iterable, Sequence, Dict, Any, Tuple, Set
from typing import List, Optional, Union, Iterable, Sequence, Dict, Any, Tuple
import numpy as np
from PIL import Image
import marqo.config as config
Expand All @@ -56,6 +55,7 @@
from marqo.tensor_search.models.search import VectorisedJobs, VectorisedJobPointer, Qidx, JHash
from marqo.tensor_search.models.index_info import IndexInfo
from marqo.tensor_search.utils import add_timing
from marqo.tensor_search import delete_docs
from marqo.s2_inference.processing import text as text_processor
from marqo.s2_inference.processing import image as image_processor
from marqo.s2_inference.clip_utils import _is_image
Expand Down Expand Up @@ -959,41 +959,6 @@ def _get_documents_for_upsert(
return res


def delete_documents(config: Config, index_name: str, doc_ids: List[str], auto_refresh):
"""Deletes documents """
if not doc_ids:
raise errors.InvalidDocumentIdError("doc_ids can't be empty!")

for _id in doc_ids:
validation.validate_id(_id)

# TODO: change to timer()
t0 = datetime.datetime.utcnow()
delete_res_backend = HttpRequests(config=config).post(
path=f"{index_name}/_delete_by_query", body={
"query": {
"terms": {
"_id": doc_ids
}
}
}
)
if auto_refresh:
refresh_response = HttpRequests(config).post(path=F"{index_name}/_refresh")
t1 = datetime.datetime.utcnow()
delete_res = {
"index_name": index_name, "status": "succeeded",
"type": "documentDeletion", "details": {
"receivedDocumentIds": len(doc_ids),
"deletedDocuments": delete_res_backend["deleted"],
},
"duration": utils.create_duration_string(t1 - t0),
"startedAt": utils.format_timestamp(t0),
"finishedAt": utils.format_timestamp(t1),
}
return delete_res


def refresh_index(config: Config, index_name: str):
return HttpRequests(config).post(path=F"{index_name}/_refresh")

Expand Down Expand Up @@ -2593,4 +2558,15 @@ def _create_score_modifiers_tensor_search_query(result_count, offset, vector_fie
"exclude": ["__chunks.__vector_*"]
}
}
return search_query
return search_query


def delete_documents(config: Config, index_name: str, doc_ids: List[str], auto_refresh):
"""Delete documents from the Marqo index with the given doc_ids """
return delete_docs.delete_documents(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When we go about an entire reformatting of tensor_search/, it may be worth having format_delete_docs_response in this function instead. Therefore operation specific files deal in Pydantic objects, and the tensor_search.py operations are responsibly for mapping args -> RequestObject, then ResponseObject -> Dict

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or have the decoding within the pydantic model (i.e. controlled via the .json method).

config=config,
del_request=delete_docs.MqDeleteDocsRequest(
index_name=index_name,
document_ids=doc_ids,
auto_refresh=auto_refresh)
)
22 changes: 22 additions & 0 deletions tests/tensor_search/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,3 +76,25 @@ def run():

def test_device_for_clip(self):
assert str(enums.Device.cpu) == "cpu"


class TestConfigBackend(MarqoTestCase):

def setUp(self) -> None:
self.endpoint = self.authorized_url

class CustomSearchDb:
opensearch = "opensearch"
elasticsearch = "elasticsearch"

def test_init_default_backend(self):
c = config.Config(url=self.endpoint)
assert c.backend == enums.SearchDb.opensearch

def test_init_custom_backend(self):
c = config.Config(url=self.endpoint, backend=self.CustomSearchDb.elasticsearch)
assert c.backend == "elasticsearch"

def test_init_custom_backend_as_string(self):
c = config.Config(url=self.endpoint, backend="elasticsearch")
assert c.backend == "elasticsearch"
Loading