Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix the IndexError in _filter_operational_batches_sequence caused by a race condition in parallel sequencing batches and pruning old batches. #83

Merged

Conversation

AlirezaRoshanzamir
Copy link
Collaborator

@AlirezaRoshanzamir AlirezaRoshanzamir commented Feb 12, 2025

The following error has been observed when using batch aggregator proxies with four workers and a large number of batches (e.g., 1,000,000) sent concurrently:

image

The source of the issue is a race condition between sequencing new batches and pruning old finalized batches. Consider the following situation:

  • Pruning of old finalized batches is called, and the right side of the first assignment is executed:

    zsequencer/common/db.py

    Lines 248 to 254 in 62cc9a4

    def _prune_old_finalized_batches(self, app_name: str, border_index: int) -> None:
    self.apps[app_name]["operational_batches_sequence"] = (
    self._filter_operational_batches_sequence(
    app_name,
    portion.open(border_index, portion.inf),
    )
    )
  • Before the assignment, new batches are upserted using the following function:

    zsequencer/common/db.py

    Lines 376 to 412 in 62cc9a4

    def sequencer_init_batches(
    self, app_name: str, initializing_batches: list[Batch]
    ) -> None:
    """Initialize and sequence batches."""
    if not initializing_batches:
    return
    last_sequenced_batch = self.apps[app_name]["last_sequenced_batch"]
    chaining_hash = last_sequenced_batch.get("chaining_hash", "")
    index = last_sequenced_batch.get("index", self._BEFORE_GLOBAL_FIRST_BATCH_INDEX)
    for batch in initializing_batches:
    if self._batch_exists(app_name, batch["hash"]):
    continue
    batch_hash = utils.gen_hash(batch["body"])
    if batch["hash"] != batch_hash:
    zlogger.warning(
    f"Invalid batch hash: expected {batch_hash} got {batch['hash']}"
    )
    continue
    index += 1
    chaining_hash = utils.gen_hash(chaining_hash + batch_hash)
    batch.update(
    {
    "state": "sequenced",
    "index": index,
    "chaining_hash": chaining_hash,
    }
    )
    self.apps[app_name]["operational_batches_sequence"].append(batch)
    self.apps[app_name]["operational_batches_hash_index_map"][batch_hash] = (
    batch["index"]
    )
    self.apps[app_name]["last_sequenced_batch"] = batch
  • Immediately after, the pruning assignment is executed, and the operational batches sequence is filled with the pruned version of the operational batches sequence before the new sequenced batches. As a result, the newly added batches are completely ignored, while the last_sequenced_batch field points to a non-existent batch.
  • The invalid last_sequenced_batch value is then used to retrieve the index of the latest sequenced batch, but since it points to a batch that does not exist, a gap is created in the operational batches sequence. This results in an IndexError when attempting to access the operational batches sequence by index.

I've fixed the issue by implementing a lazy evaluation approach when slicing the operational batches sequence based on intervals. Specifically, when a caller of _filter_operational_batches_sequence passes an interval with portion.inf or -portion.inf (or any value ≤ 1), it indicates that the caller wants all batches up to the end (for portion.inf) or from the start (for -portion.inf or values ≤ 1).

Previously, I intersected those bounds with the operational batches sequence's max and min indices. Now, I convert the interval into a Python built-in slice (with support for infinite bounds) and use that to slice the operational batches sequence. In this way, the following line indicates a range from the specified border to the end of the operational batches sequence and is evaluated exactly as written:

zsequencer/common/db.py

Lines 248 to 254 in 62cc9a4

def _prune_old_finalized_batches(self, app_name: str, border_index: int) -> None:
self.apps[app_name]["operational_batches_sequence"] = (
self._filter_operational_batches_sequence(
app_name,
portion.open(border_index, portion.inf),
)
)

@AlirezaRoshanzamir AlirezaRoshanzamir marked this pull request as ready for review February 15, 2025 18:55
@AlirezaRoshanzamir AlirezaRoshanzamir merged commit 33cac9f into dev Feb 15, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant