Skip to content

Commit

Permalink
Partial update (#220)
Browse files Browse the repository at this point in the history
Support for partial document update
  • Loading branch information
wanliAlex authored Mar 6, 2024
1 parent e718e08 commit 2a340f5
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 2 deletions.
11 changes: 9 additions & 2 deletions src/marqo/_httprequests.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,16 @@
BackendTimeoutError
)

HTTP_OPERATIONS = Literal["delete", "get", "post", "put"]
HTTP_OPERATIONS = Literal["delete", "get", "post", "put", "patch"]
ALLOWED_OPERATIONS: Tuple[HTTP_OPERATIONS, ...] = get_args(HTTP_OPERATIONS)
session = requests.Session()

OPERATION_MAPPING = {
'delete': session.delete,
'get': session.get,
'post': session.post,
'put': session.put
'put': session.put,
'patch': session.patch
}


Expand Down Expand Up @@ -118,6 +119,12 @@ def delete(
) -> Any:
return self.send_request('delete', path, body, index_name=index_name)

def patch(self,
path: str,
body: Optional[Union[Dict[str, Any], List[Dict[str, Any]], List[str], str]] = None,
index_name: str = "") -> Any:
return self.send_request('patch', path, body, index_name=index_name)

@staticmethod
def __to_json(
request: requests.Response
Expand Down
91 changes: 91 additions & 0 deletions src/marqo/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,97 @@ def _add_docs_organiser(
mq_logger.debug(f"add_documents completed. total time taken: {(total_add_docs_time):.3f}s.")
return res

def update_documents(self, documents: List[Dict], client_batch_size: Optional[int]= None) \
-> Union[Dict[str, Any], List[Dict[str, Any]]]:
"""Update documents in this index. Does a partial update on existing documents."""

t0 = timer()

error_detected_message = ('Errors detected in update_documents call. '
'Please examine the returned result object for more information.')

if client_batch_size is not None:
if (not isinstance(client_batch_size, int)) or client_batch_size <= 0:
raise errors.InvalidArgError("Batch size must be a positive integer")
res = self._batch_update_documents(documents, client_batch_size)
else:
start_time_client_request = timer()
num_docs = len(documents)

base_path = f"indexes/{self.index_name}/documents"
body = {"documents": documents}

res = self.http.patch(
path=base_path, body=body, index_name=self.index_name,
)
end_time_client_request = timer()
total_client_request_time = end_time_client_request - start_time_client_request

mq_logger.debug(f"update_documents roundtrip: took {(total_client_request_time):.3f}s to send {num_docs} "
f"docs to Marqo (roundtrip, unbatched).")
errors_detected = False

if 'processingTimeMs' in res: # Only outputs log if response is non-empty
mq_logger.debug(
f"update_documents Marqo index: took {(res['processingTimeMs'] / 1000):.3f}s for Marqo to process & index {num_docs} "
f"docs.")
if 'errors' in res and res['errors']:
mq_logger.info(error_detected_message)
if errors_detected:
mq_logger.info(error_detected_message)
total_add_docs_time = timer() - t0
mq_logger.debug(f"update_documents completed. total time taken: {(total_add_docs_time):.3f}s.")
return res

def _update_documents(self, documents: List[Dict]) -> Dict[str, Any]:
"""Update documents in this index. Does a partial update on existing documents."""
base_path = f"indexes/{self.index_name}/documents/update"
return self.http.post(path=base_path, body=documents, index_name=self.index_name,)

def _batch_update_documents(self, documents, client_batch_size) -> List[Dict[str, Any]]:
"""Update documents in this index with batched requests. Does a partial update on existing documents."""

deeper = ((doc, i, client_batch_size) for i, doc in enumerate(documents))
base_path = f"indexes/{self.index_name}/documents"

error_detected_message = ('Errors detected in update_documents call. '
'Please examine the returned result object for more information.')

def batch_requests(gathered, doc_tuple):
doc, i, the_batch_size = doc_tuple
if i % the_batch_size == 0:
gathered.append([doc, ])
else:
gathered[-1].append(doc)
return gathered

batched = functools.reduce(lambda x, y: batch_requests(x, y), deeper, [])
def update_batch_documents(batch_number, docs):
errors_detected = False

t0 = timer()

body = {"documents": docs}
res = self.http.patch(path=base_path, body=body, index_name=self.index_name)

total_batch_time = timer() - t0
num_docs = len(docs)

if 'processingTimeMs' in res: # Only outputs log if response is non-empty
mq_logger.info(
f" update_documents batch {batch_number}: took {(res['processingTimeMs'] / 1000):.3f}s "
f"for Marqo to process & index {num_docs} docs. Roundtrip time: {(total_batch_time):.3f}s.")
if 'errors' in res and res['errors']:
errors_detected = True

if errors_detected:
mq_logger.info(f" update_documents batch {batch_number}: {error_detected_message}")
return res

results = [update_batch_documents(batch_number, docs) for batch_number, docs in enumerate(batched)]
mq_logger.debug('completed batch ingestion.')
return results

def delete_documents(self, ids: List[str]) -> Dict[str, int]:
"""Delete documents from this index by a list of their ids.
Expand Down

0 comments on commit 2a340f5

Please sign in to comment.