From 67799173be3217abf4cf040c1fa67798603372ef Mon Sep 17 00:00:00 2001 From: Alireza Roshanzamir Date: Wed, 22 Jan 2025 16:14:11 +0330 Subject: [PATCH 01/13] Apply black on db.py file. --- common/db.py | 123 ++++++++++++++++++++++++++++++++++----------------- 1 file changed, 82 insertions(+), 41 deletions(-) diff --git a/common/db.py b/common/db.py index 4262363..55afadf 100644 --- a/common/db.py +++ b/common/db.py @@ -75,7 +75,9 @@ def schedule_fetch(self) -> None: try: zconfig.fetch_network_state() except: - zlogger.error("An unexpected error occurred while fetching network state") + zlogger.error( + "An unexpected error occurred while fetching network state" + ) time.sleep(zconfig.FETCH_APPS_AND_NODES_INTERVAL) @@ -83,9 +85,15 @@ def load_state(self) -> None: """Load the initial state from the snapshot files.""" for app_name in getattr(zconfig, "APPS", []): - finalized_batches: dict[str, dict[str, Any]] = self.load_finalized_batches(app_name) + finalized_batches: dict[str, dict[str, Any]] = self.load_finalized_batches( + app_name + ) last_finalized_batch: dict[str, Any] = max( - (batch for batch in finalized_batches.values() if batch.get("finalization_signature")), + ( + batch + for batch in finalized_batches.values() + if batch.get("finalization_signature") + ), key=lambda batch: batch["index"], default={}, ) @@ -110,7 +118,9 @@ def load_keys() -> dict[str, Any]: return {} @staticmethod - def load_finalized_batches(app_name: str, index: int | None = None) -> dict[str, Any]: + def load_finalized_batches( + app_name: str, index: int | None = None + ) -> dict[str, Any]: """Load finalized batches for a given app from the snapshot file.""" snapshot_dir: str = os.path.join( zconfig.SNAPSHOT_PATH, zconfig.VERSION, app_name @@ -118,11 +128,10 @@ def load_finalized_batches(app_name: str, index: int | None = None) -> dict[str, if index is None: index = 0 snapshots = sorted( - file for file in os.listdir(snapshot_dir) - if file.endswith(".json.gz") + file for file in os.listdir(snapshot_dir) if file.endswith(".json.gz") ) if snapshots: - index = int(snapshots[-1].split('.')[0]) + index = int(snapshots[-1].split(".")[0]) else: index = math.ceil(index / zconfig.SNAPSHOT_CHUNK) * zconfig.SNAPSHOT_CHUNK @@ -130,8 +139,9 @@ def load_finalized_batches(app_name: str, index: int | None = None) -> dict[str, return {} try: - with gzip.open(snapshot_dir + f"/{str(index).zfill(7)}.json.gz" - , "rt", encoding="UTF-8") as file: + with gzip.open( + snapshot_dir + f"/{str(index).zfill(7)}.json.gz", "rt", encoding="UTF-8" + ) as file: return json.load(file) except (FileNotFoundError, EOFError): pass @@ -153,9 +163,7 @@ def save_snapshot(self, app_name: str, index: int) -> None: snapshot_dir: str = os.path.join( zconfig.SNAPSHOT_PATH, zconfig.VERSION, app_name ) - self.save_batches_to_file( - app_name, index, snapshot_border, snapshot_dir - ) + self.save_batches_to_file(app_name, index, snapshot_border, snapshot_dir) self.prune_old_batches(app_name, remove_border) except Exception as error: zlogger.exception( @@ -166,17 +174,18 @@ def save_snapshot(self, app_name: str, index: int) -> None: ) def save_batches_to_file( - self, app_name: str, index: int, snapshot_border: int, snapshot_dir + self, app_name: str, index: int, snapshot_border: int, snapshot_dir ) -> None: """Helper function to save batches to a snapshot file.""" - with gzip.open(snapshot_dir + f"/{str(index).zfill(7)}.json.gz", - "wt", encoding="UTF-8") as file: + with gzip.open( + snapshot_dir + f"/{str(index).zfill(7)}.json.gz", "wt", encoding="UTF-8" + ) as file: json.dump( { batch["hash"]: batch for batch in self.apps[app_name]["batches"].values() if batch["state"] == "finalized" - and snapshot_border < batch["index"] <= index + and snapshot_border < batch["index"] <= index }, file, ) @@ -190,7 +199,12 @@ def prune_old_batches(self, app_name: str, remove_border: int) -> None: } def __process_batches( - self, loaded_batches: dict[str, Any], states: set[str], after: float, batches: dict[str, Any]) -> int: + self, + loaded_batches: dict[str, Any], + states: set[str], + after: float, + batches: dict[str, Any], + ) -> int: """Filter and add batches to the result based on state and index.""" # fixme: sort should be removed after updating batches dict to list sorter = lambda batch: batch.get("index", 0) @@ -200,19 +214,29 @@ def __process_batches( if batch["state"] in states and batch.get("index", 0) > after: batches[batch["hash"]] = batch - def get_batches(self, app_name: str, states: set[str], after: float = -1) -> dict[str, Any]: + def get_batches( + self, app_name: str, states: set[str], after: float = -1 + ) -> dict[str, Any]: """Get batches filtered by state and optionally by index.""" batches: dict[str, Any] = {} - last_finalized_index = self.apps[app_name]["last_finalized_batch"].get("index", 0) + last_finalized_index = self.apps[app_name]["last_finalized_batch"].get( + "index", 0 + ) current_chunk = math.ceil((after + 1) / zconfig.SNAPSHOT_CHUNK) - next_chunk = math.ceil((after + 1 + zconfig.API_BATCHES_LIMIT) / zconfig.SNAPSHOT_CHUNK) + next_chunk = math.ceil( + (after + 1 + zconfig.API_BATCHES_LIMIT) / zconfig.SNAPSHOT_CHUNK + ) finalized_chunk = math.ceil(last_finalized_index / zconfig.SNAPSHOT_CHUNK) if current_chunk != finalized_chunk: loaded_batches = self.load_finalized_batches(app_name, after + 1) self.__process_batches(loaded_batches, states, after, batches) - if len(batches) < zconfig.API_BATCHES_LIMIT and \ - next_chunk not in [current_chunk, finalized_chunk]: - loaded_batches = self.load_finalized_batches(app_name, after + 1 + len(batches)) + if len(batches) < zconfig.API_BATCHES_LIMIT and next_chunk not in [ + current_chunk, + finalized_chunk, + ]: + loaded_batches = self.load_finalized_batches( + app_name, after + 1 + len(batches) + ) self.__process_batches(loaded_batches, states, after, batches) self.__process_batches(self.apps[app_name]["batches"], states, after, batches) return batches @@ -254,13 +278,17 @@ def get_last_batch(self, app_name: str, state: str) -> dict[str, Any]: """Get the last batch for a given state.""" return self.apps.get(app_name, {}).get(f"last_{state}_batch", {}) - def sequencer_init_batches(self, app_name: str, batches_data: list[dict[str, Any]]) -> None: + def sequencer_init_batches( + self, app_name: str, batches_data: list[dict[str, Any]] + ) -> None: """Initialize and sequence batches.""" if not batches_data: return batches: dict[str, Any] = self.apps[app_name]["batches"] - last_sequenced_batch: dict[str, Any] = self.apps[app_name]["last_sequenced_batch"] + last_sequenced_batch: dict[str, Any] = self.apps[app_name][ + "last_sequenced_batch" + ] chaining_hash: str = last_sequenced_batch.get("chaining_hash", "") index: int = last_sequenced_batch.get("index", 0) @@ -312,12 +340,20 @@ def upsert_sequenced_batches(self, app_name: str, batches_data: list[str]) -> No def update_locked_batches(self, app_name: str, sig_data: dict[str, Any]) -> None: """Update batches to 'locked' state up to a specified index.""" - if sig_data["index"] <= self.apps[app_name]["last_locked_batch"].get("index", 0): + if sig_data["index"] <= self.apps[app_name]["last_locked_batch"].get( + "index", 0 + ): return batches: dict[str, Any] = self.apps[app_name]["batches"] # fixme: sort should be removed after updating batches dict to list - for batch in sorted(list(batches.values()), key=lambda batch: batch.get("index", 0)): - if "index" in batch and batch["index"] <= sig_data["index"] and batch["state"] != "finalized": + for batch in sorted( + list(batches.values()), key=lambda batch: batch.get("index", 0) + ): + if ( + "index" in batch + and batch["index"] <= sig_data["index"] + and batch["state"] != "finalized" + ): batch["state"] = "locked" if not batches.get(sig_data["hash"]): return @@ -332,7 +368,7 @@ def update_locked_batches(self, app_name: str, sig_data: dict[str, Any]) -> None def update_finalized_batches(self, app_name: str, sig_data: dict[str, Any]) -> None: """Update batches to 'finalized' state up to a specified index and save snapshots.""" if sig_data.get("index", 0) <= self.apps[app_name]["last_finalized_batch"].get( - "index", 0 + "index", 0 ): return @@ -340,10 +376,15 @@ def update_finalized_batches(self, app_name: str, sig_data: dict[str, Any]) -> N snapshot_indexes: list[int] = [] # fixme: sort should be removed after updating batches dict to list - for batch in sorted(list(batches.values()), key=lambda batch: batch.get("index", 0)): + for batch in sorted( + list(batches.values()), key=lambda batch: batch.get("index", 0) + ): if "index" in batch and batch["index"] <= sig_data["index"]: batch["state"] = "finalized" - if batch["state"] == "finalized" and batch["index"] % zconfig.SNAPSHOT_CHUNK == 0: + if ( + batch["state"] == "finalized" + and batch["index"] % zconfig.SNAPSHOT_CHUNK == 0 + ): snapshot_indexes.append(batch["index"]) if not batches.get(sig_data["hash"]): @@ -361,8 +402,8 @@ def update_finalized_batches(self, app_name: str, sig_data: dict[str, Any]) -> N self.save_snapshot(app_name, snapshot_index) def upsert_node_state( - self, - node_state: dict[str, Any], + self, + node_state: dict[str, Any], ) -> None: """Upsert the state of a node.""" if not node_state["sequenced_index"]: @@ -388,7 +429,7 @@ def upsert_locked_sync_point(self, app_name: str, state: dict[str, Any]) -> None "hash": state["hash"], "signature": state["signature"], "nonsigners": state["nonsigners"], - "tag": state["tag"] + "tag": state["tag"], } def upsert_finalized_sync_point(self, app_name: str, state: dict[str, Any]) -> None: @@ -399,7 +440,7 @@ def upsert_finalized_sync_point(self, app_name: str, state: dict[str, Any]) -> N "hash": state["hash"], "signature": state["signature"], "nonsigners": state["nonsigners"], - "tag": state["tag"] + "tag": state["tag"], } def get_locked_sync_point(self, app_name: str) -> dict[str, Any]: @@ -434,10 +475,10 @@ def has_missed_batches(self) -> bool: return False def reinitialize_db( - self, - app_name: str, - new_sequencer_id: str, - all_nodes_last_finalized_batch: dict[str, Any], + self, + app_name: str, + new_sequencer_id: str, + all_nodes_last_finalized_batch: dict[str, Any], ): """Reinitialize the database after a switch in the sequencer.""" self.apps[app_name]["last_sequenced_batch"] = all_nodes_last_finalized_batch @@ -458,7 +499,7 @@ def reinitialize_db( self.reinitialize_batches(app_name, all_nodes_last_finalized_batch) def resequence_batches( - self, app_name: str, all_nodes_last_finalized_batch: dict[str, Any] + self, app_name: str, all_nodes_last_finalized_batch: dict[str, Any] ) -> None: """Resequence batches after a switch in the sequencer.""" keys_to_retain: set[str] = {"app_name", "node_id", "timestamp", "hash", "body"} @@ -493,7 +534,7 @@ def reset_timestamps(self, app_name: str) -> None: batch["timestamp"] = int(time.time()) def reinitialize_batches( - self, app_name: str, all_nodes_last_finalized_batch: dict[str, Any] + self, app_name: str, all_nodes_last_finalized_batch: dict[str, Any] ) -> None: """Reinitialize batches after a switch in the sequencer.""" keys_to_retain: set[str] = { From 09bf3aa88f748ed972c917fe6a1efb75c94a755b Mon Sep 17 00:00:00 2001 From: Alireza Roshanzamir Date: Wed, 29 Jan 2025 01:02:13 +0330 Subject: [PATCH 02/13] Use sequence for the operational batches instead of a map. --- common/db.py | 819 ++++++++++++++++++++++++++++++-------------- node/routes.py | 31 +- node/tasks.py | 47 ++- run.py | 5 + sequencer/routes.py | 17 +- sequencer/tasks.py | 8 +- 6 files changed, 615 insertions(+), 312 deletions(-) diff --git a/common/db.py b/common/db.py index 55afadf..affc898 100644 --- a/common/db.py +++ b/common/db.py @@ -1,3 +1,4 @@ +from __future__ import annotations import gzip import json import math @@ -5,52 +6,122 @@ import threading import time from typing import Any -import requests from threading import Thread from config import zconfig from utils import get_file_content from . import utils from .logger import zlogger - - +from typing import Literal +import itertools +import portion # type: ignore[import-untyped] +from typing import TypedDict, Iterable +import typeguard +import debugpy # type: ignore[import-untyped] +import random +import sys + +State = Literal["initialized", "sequenced", "locked", "finalized"] +OperationalState = Literal["sequenced", "locked", "finalized"] + + +class Batch(TypedDict, total=False): + app_name: str + node_id: str + timestamp: int + body: str + index: int + hash: str + state: State + chaining_hash: str + lock_signature: str + locked_nonsigners: list[str] + locked_tag: int + finalization_signature: str + finalized_nonsigners: list[str] + finalized_tag: int + + +class SignatureData(TypedDict, total=False): + index: int + chaining_hash: str + hash: str + signature: str + nonsigners: list[str] + tag: int + + +class App(TypedDict, total=False): + nodes_state: dict[str, Any] + initialized_batches_map: dict[str, Batch] + operational_batches_sequence: list[Batch] + operational_batches_hash_index_map: dict[str, int] + missed_batches_map: dict[str, Batch] + last_sequenced_batch: Batch + last_locked_batch: Batch + last_finalized_batch: Batch + + +@typeguard.typechecked class InMemoryDB: """A thread-safe singleton in-memory database class to manage batches of transactions and states for apps.""" - _instance: "InMemoryDB | None" = None + # TODO: We only have one instantiation of this class, why we use locking and singleton here? + _instance: InMemoryDB | None = None instance_lock: threading.Lock = threading.Lock() - def __new__(cls) -> "InMemoryDB": + def __new__(cls) -> InMemoryDB: """Singleton pattern implementation to ensure only one instance exists.""" with cls.instance_lock: if cls._instance is None: cls._instance = super().__new__(cls) cls._instance._initialize() - fetch_data = Thread(target=cls._instance.schedule_fetch) - fetch_data.start() + fetching_thread = Thread( + target=cls._instance._fetch_apps_and_network_state_periodically + ) + fetching_thread.start() return cls._instance def _initialize(self) -> None: """Initialize the InMemoryDB instance.""" + # if "1" in sys.argv: + # listening_port = 5678 + # zlogger.error(f"Listening on {listening_port}") + # debugpy.listen(("0.0.0.0", listening_port)) + # debugpy.wait_for_client() + # debugpy.breakpoint() + self.sequencer_put_batches_lock = threading.Lock() self.pause_node = threading.Event() - self.last_stored_index = 0 - self.apps: dict[str, Any] = {} - self.is_sequencer_down: bool = False - self.load_state() - - def fetch_apps(self) -> None: + self._last_saved_index = 0 + self.is_sequencer_down = False + self.apps = self._load_finalized_batches_for_all_apps() + + # def on_change() -> None: + # for app_name, app in self.apps.items(): + # zlogger.warning( + # "s: {}, l: {}, f: {}".format( + # app.get("last_sequenced_batch", {}).get("index", -1), + # app.get("last_locked_batch", {}).get("index", -1), + # app.get("last_finalized_batch", {}).get("index", -1), + # ) + # ) + # self.apps.bind_callback(on_change) + + def _fetch_apps(self) -> None: """Fetchs the apps data.""" data = get_file_content(zconfig.APPS_FILE) - new_apps = {} + new_apps: dict[str, App] = {} for app_name in data: if app_name in self.apps: new_apps[app_name] = self.apps[app_name] else: new_apps[app_name] = { "nodes_state": {}, - "batches": {}, - "missed_batches": {}, + "initialized_batches_map": {}, + "operational_batches_sequence": [], + "operational_batches_hash_index_map": {}, + "missed_batches_map": {}, "last_sequenced_batch": {}, "last_locked_batch": {}, "last_finalized_batch": {}, @@ -59,16 +130,16 @@ def fetch_apps(self) -> None: zconfig.APPS.update(data) self.apps.update(new_apps) for app_name in zconfig.APPS: - snapshot_path: str = os.path.join( + snapshot_path = os.path.join( zconfig.SNAPSHOT_PATH, zconfig.VERSION, app_name ) os.makedirs(snapshot_path, exist_ok=True) - def schedule_fetch(self) -> None: + def _fetch_apps_and_network_state_periodically(self) -> None: """Periodically fetches apps and nodes data.""" while True: try: - self.fetch_apps() + self._fetch_apps() except: zlogger.error("An unexpected error occurred while fetching apps data") @@ -81,34 +152,41 @@ def schedule_fetch(self) -> None: time.sleep(zconfig.FETCH_APPS_AND_NODES_INTERVAL) - def load_state(self) -> None: - """Load the initial state from the snapshot files.""" + @classmethod + def _load_finalized_batches_for_all_apps(cls) -> dict[str, App]: + """Load and return the initial state from the snapshot files.""" + result: dict[str, App] = {} + # TODO: Replace with dot operator. for app_name in getattr(zconfig, "APPS", []): - finalized_batches: dict[str, dict[str, Any]] = self.load_finalized_batches( - app_name - ) - last_finalized_batch: dict[str, Any] = max( + finalized_batches = cls._load_finalized_batches(app_name) + last_finalized_batch: Batch = next( ( batch - for batch in finalized_batches.values() + for batch in reversed(finalized_batches) if batch.get("finalization_signature") ), - key=lambda batch: batch["index"], - default={}, + {}, ) - self.apps[app_name] = { + result[app_name] = { "nodes_state": {}, - "batches": finalized_batches, - "missed_batches": {}, + "initialized_batches_map": {}, + "operational_batches_sequence": finalized_batches, + "operational_batches_hash_index_map": cls._generate_batches_hash_index_map( + finalized_batches + ), + "missed_batches_map": {}, # TODO: store batch hash instead of batch "last_sequenced_batch": last_finalized_batch, "last_locked_batch": last_finalized_batch, "last_finalized_batch": last_finalized_batch, } + return result + + # NOTE: Unused method. @staticmethod - def load_keys() -> dict[str, Any]: + def _load_keys() -> dict[str, Any]: """Load keys from the snapshot file.""" keys_path: str = os.path.join(zconfig.SNAPSHOT_PATH, "keys.json.gz") try: @@ -117,30 +195,32 @@ def load_keys() -> dict[str, Any]: except (OSError, IOError, json.JSONDecodeError): return {} - @staticmethod - def load_finalized_batches( - app_name: str, index: int | None = None - ) -> dict[str, Any]: + @classmethod + def _load_finalized_batches( + cls, app_name: str, index: int | None = None + ) -> list[Batch]: """Load finalized batches for a given app from the snapshot file.""" - snapshot_dir: str = os.path.join( - zconfig.SNAPSHOT_PATH, zconfig.VERSION, app_name - ) + snapshot_dir = os.path.join(zconfig.SNAPSHOT_PATH, zconfig.VERSION, app_name) if index is None: - index = 0 + effective_index = 0 snapshots = sorted( file for file in os.listdir(snapshot_dir) if file.endswith(".json.gz") ) if snapshots: - index = int(snapshots[-1].split(".")[0]) + effective_index = int(snapshots[-1].split(".")[0]) else: - index = math.ceil(index / zconfig.SNAPSHOT_CHUNK) * zconfig.SNAPSHOT_CHUNK + effective_index = ( + math.ceil(index / zconfig.SNAPSHOT_CHUNK) * zconfig.SNAPSHOT_CHUNK + ) - if index <= 0: - return {} + if effective_index <= 0: + return [] try: with gzip.open( - snapshot_dir + f"/{str(index).zfill(7)}.json.gz", "rt", encoding="UTF-8" + snapshot_dir + f"/{str(effective_index).zfill(7)}.json.gz", + "rt", + encoding="UTF-8", ) as file: return json.load(file) except (FileNotFoundError, EOFError): @@ -151,20 +231,17 @@ def load_finalized_batches( app_name, error, ) - return {} + return [] - def save_snapshot(self, app_name: str, index: int) -> None: + def _save_snapshot_then_prune(self, app_name: str, index: int) -> None: """Save a snapshot of the finalized batches to a file.""" - snapshot_border: int = index - zconfig.SNAPSHOT_CHUNK - remove_border: int = max( + snapshot_border = index - zconfig.SNAPSHOT_CHUNK + remove_border = max( index - zconfig.SNAPSHOT_CHUNK * zconfig.REMOVE_CHUNK_BORDER, 0 ) try: - snapshot_dir: str = os.path.join( - zconfig.SNAPSHOT_PATH, zconfig.VERSION, app_name - ) - self.save_batches_to_file(app_name, index, snapshot_border, snapshot_dir) - self.prune_old_batches(app_name, remove_border) + self._save_finalized_batches(app_name, index, snapshot_border) + self._prune_initialized_and_old_operational_batches(app_name, remove_border) except Exception as error: zlogger.exception( "An error occurred while saving snapshot for %s at index %d: %s", @@ -173,52 +250,51 @@ def save_snapshot(self, app_name: str, index: int) -> None: error, ) - def save_batches_to_file( - self, app_name: str, index: int, snapshot_border: int, snapshot_dir + def _save_finalized_batches( + self, app_name: str, index: int, snapshot_border: int ) -> None: """Helper function to save batches to a snapshot file.""" + snapshot_dir = os.path.join(zconfig.SNAPSHOT_PATH, zconfig.VERSION, app_name) with gzip.open( snapshot_dir + f"/{str(index).zfill(7)}.json.gz", "wt", encoding="UTF-8" ) as file: json.dump( - { - batch["hash"]: batch - for batch in self.apps[app_name]["batches"].values() - if batch["state"] == "finalized" - and snapshot_border < batch["index"] <= index - }, + self._filter_operational_batches_sequence( + app_name, + self._get_batch_index_interval(app_name, "finalized") + & portion.openclosed(snapshot_border, index), + ), file, ) - def prune_old_batches(self, app_name: str, remove_border: int) -> None: + def _prune_initialized_and_old_operational_batches( + self, app_name: str, remove_border: int + ) -> None: """Helper function to prune old batches from memory.""" - self.apps[app_name]["batches"] = { - batch["hash"]: batch - for batch in self.apps[app_name]["batches"].values() - if batch["state"] != "finalized" or batch["index"] > remove_border - } + self.apps[app_name]["initialized_batches_map"] = {} + self.apps[app_name]["operational_batches_sequence"] = ( + self._filter_operational_batches_sequence( + app_name, + self._get_batch_index_interval(app_name, state="finalized").complement() + | portion.open(remove_border, portion.inf), + ) + ) + self.apps[app_name]["operational_batches_hash_index_map"] = ( + self._generate_batches_hash_index_map( + self.apps[app_name]["operational_batches_sequence"] + ) + ) - def __process_batches( - self, - loaded_batches: dict[str, Any], - states: set[str], - after: float, - batches: dict[str, Any], - ) -> int: - """Filter and add batches to the result based on state and index.""" - # fixme: sort should be removed after updating batches dict to list - sorter = lambda batch: batch.get("index", 0) - for batch in sorted(list(loaded_batches.values()), key=sorter): - if len(batches) >= zconfig.API_BATCHES_LIMIT: - return - if batch["state"] in states and batch.get("index", 0) > after: - batches[batch["hash"]] = batch - - def get_batches( - self, app_name: str, states: set[str], after: float = -1 - ) -> dict[str, Any]: + def get_initialized_batches_map(self, app_name: str) -> dict[str, Batch]: + return self.apps[app_name]["initialized_batches_map"] + + def get_entire_operational_batches_sequence( + self, app_name: str, states: set[OperationalState], after: int = 0 + ) -> list[Batch]: """Get batches filtered by state and optionally by index.""" - batches: dict[str, Any] = {} + batches_sequence: list[Batch] = [] + batches_hash_set: set[str] = set() # TODO: Check if this is necessary. + last_finalized_index = self.apps[app_name]["last_finalized_batch"].get( "index", 0 ) @@ -228,44 +304,74 @@ def get_batches( ) finalized_chunk = math.ceil(last_finalized_index / zconfig.SNAPSHOT_CHUNK) if current_chunk != finalized_chunk: - loaded_batches = self.load_finalized_batches(app_name, after + 1) - self.__process_batches(loaded_batches, states, after, batches) - if len(batches) < zconfig.API_BATCHES_LIMIT and next_chunk not in [ + loaded_finalized_batches = self._load_finalized_batches(app_name, after + 1) + self._filter_then_add_the_finalized_batches( + loaded_finalized_batches, + states, + after, + batches_sequence, + batches_hash_set, + ) + + if len(batches_sequence) < zconfig.API_BATCHES_LIMIT and next_chunk not in [ current_chunk, finalized_chunk, ]: - loaded_batches = self.load_finalized_batches( - app_name, after + 1 + len(batches) + loaded_finalized_batches = self._load_finalized_batches( + app_name, after + 1 + len(batches_sequence) + ) + self._filter_then_add_the_finalized_batches( + loaded_finalized_batches, + states, + after, + batches_sequence, + batches_hash_set, ) - self.__process_batches(loaded_batches, states, after, batches) - self.__process_batches(self.apps[app_name]["batches"], states, after, batches) - return batches - def get_batch(self, app_name: str, batch_hash: str) -> dict[str, Any]: + self._filter_then_add_the_finalized_batches( + self.apps[app_name]["operational_batches_sequence"], + states, + after, + batches_sequence, + batches_hash_set, + ) + + return batches_sequence + + def get_batch(self, app_name: str, batch_hash: str) -> Batch: """Get a batch by its hash.""" - return self.apps[app_name]["batches"].get(batch_hash, {}) + if batch_hash in self.apps[app_name]["initialized_batches_map"]: + return self.apps[app_name]["initialized_batches_map"][batch_hash] + elif batch_hash in self.apps[app_name]["operational_batches_hash_index_map"]: + batch_index = self.apps[app_name]["operational_batches_hash_index_map"][ + batch_hash + ] + return self.apps[app_name]["operational_batches_sequence"][ + self._get_batch_relative_index(app_name, batch_index) + ] + return {} - def get_not_finalized_batches(self, app_name: str) -> dict[str, dict[str, Any]]: + def get_still_sequenced_batches(self, app_name: str) -> list[Batch]: """Get batches that are not finalized based on the finalization time border.""" - border: int = int(time.time()) - zconfig.FINALIZATION_TIME_BORDER - batches: dict[str, Any] = self.apps[app_name]["batches"] - return { - batch_hash: batch - for batch_hash, batch in list(batches.items()) - if batch["state"] == "sequenced" and batch["timestamp"] < border - } + border = int(time.time()) - zconfig.FINALIZATION_TIME_BORDER + return [ + batch + for batch in self._filter_operational_batches_sequence( + app_name, self._get_batch_index_interval(app_name, "sequenced") + ) + if batch["timestamp"] < border + ] def init_batches(self, app_name: str, bodies: list[str]) -> None: """Initialize batches of transactions with a given body.""" if not bodies: return - batches: dict[str, Any] = self.apps[app_name]["batches"] + now = int(time.time()) for body in bodies: - now: int = int(time.time()) batch_hash: str = utils.gen_hash(body) - if batch_hash not in batches: - batches[batch_hash] = { + if not self._batch_exists(app_name, batch_hash): + self.apps[app_name]["initialized_batches_map"][batch_hash] = { "app_name": app_name, "node_id": zconfig.NODE["id"], "timestamp": now, @@ -274,28 +380,32 @@ def init_batches(self, app_name: str, bodies: list[str]) -> None: "body": body, } - def get_last_batch(self, app_name: str, state: str) -> dict[str, Any]: + def get_last_batch(self, app_name: str, state: OperationalState) -> Batch: """Get the last batch for a given state.""" - return self.apps.get(app_name, {}).get(f"last_{state}_batch", {}) + match state: + case "sequenced": + return self.apps[app_name]["last_sequenced_batch"] + case "locked": + return self.apps[app_name]["last_locked_batch"] + case "finalized": + return self.apps[app_name]["last_finalized_batch"] def sequencer_init_batches( - self, app_name: str, batches_data: list[dict[str, Any]] + self, app_name: str, initializing_batches: list[Batch] ) -> None: """Initialize and sequence batches.""" - if not batches_data: + if not initializing_batches: return - batches: dict[str, Any] = self.apps[app_name]["batches"] - last_sequenced_batch: dict[str, Any] = self.apps[app_name][ - "last_sequenced_batch" - ] - chaining_hash: str = last_sequenced_batch.get("chaining_hash", "") - index: int = last_sequenced_batch.get("index", 0) + last_sequenced_batch = self.apps[app_name]["last_sequenced_batch"] + chaining_hash = last_sequenced_batch.get("chaining_hash", "") + index = last_sequenced_batch.get("index", 0) - for batch in batches_data: - if batch["hash"] in batches: + for batch in initializing_batches: + if self._batch_exists(app_name, batch["hash"]): continue - batch_hash: str = utils.gen_hash(batch["body"]) + + batch_hash = utils.gen_hash(batch["body"]) if batch["hash"] != batch_hash: zlogger.warning( f"Invalid batch hash: expected {batch_hash} got {batch['hash']}" @@ -311,20 +421,24 @@ def sequencer_init_batches( "chaining_hash": chaining_hash, } ) - batches[batch_hash] = batch + self.apps[app_name]["operational_batches_sequence"].append(batch) + self.apps[app_name]["operational_batches_hash_index_map"][batch_hash] = ( + batch["index"] + ) self.apps[app_name]["last_sequenced_batch"] = batch - def upsert_sequenced_batches(self, app_name: str, batches_data: list[str]) -> None: + def upsert_sequenced_batches( + self, app_name: str, sequenced_batches: list[Batch] + ) -> None: """Upsert sequenced batches.""" - batches: dict[str, Any] = self.apps[app_name]["batches"] - if not batches_data: + if not sequenced_batches: return - chaining_hash: str = self.apps[app_name]["last_sequenced_batch"].get( + chaining_hash = self.apps[app_name]["last_sequenced_batch"].get( "chaining_hash", "" ) - now: int = int(time.time()) - for batch in batches_data: + now = int(time.time()) + for batch in sequenced_batches: if chaining_hash or batch["index"] == 1: chaining_hash = utils.gen_hash(chaining_hash + batch["hash"]) if batch["chaining_hash"] != chaining_hash: @@ -332,74 +446,84 @@ def upsert_sequenced_batches(self, app_name: str, batches_data: list[str]) -> No f"Invalid chaining hash: expected {chaining_hash} got {batch['chaining_hash']}" ) return + batch["state"] = "sequenced" - batch["sequenced_timestamp"] = now - batches[batch["hash"]] = batch + batch["timestamp"] = now + + self.apps[app_name]["initialized_batches_map"].pop(batch["hash"], None) + self.apps[app_name]["operational_batches_sequence"].append(batch) + self.apps[app_name]["operational_batches_hash_index_map"][batch["hash"]] = ( + batch["index"] + ) + if chaining_hash: self.apps[app_name]["last_sequenced_batch"] = batch - def update_locked_batches(self, app_name: str, sig_data: dict[str, Any]) -> None: + def lock_batches(self, app_name: str, signature_data: SignatureData) -> None: """Update batches to 'locked' state up to a specified index.""" - if sig_data["index"] <= self.apps[app_name]["last_locked_batch"].get( + if signature_data["index"] <= self.apps[app_name]["last_locked_batch"].get( "index", 0 ): return - batches: dict[str, Any] = self.apps[app_name]["batches"] - # fixme: sort should be removed after updating batches dict to list - for batch in sorted( - list(batches.values()), key=lambda batch: batch.get("index", 0) + + if ( + signature_data["hash"] + not in self.apps[app_name]["operational_batches_hash_index_map"] ): - if ( - "index" in batch - and batch["index"] <= sig_data["index"] - and batch["state"] != "finalized" - ): - batch["state"] = "locked" - if not batches.get(sig_data["hash"]): return - target_batch: dict[str, Any] = batches[sig_data["hash"]] - target_batch["lock_signature"] = sig_data["signature"] - target_batch["locked_nonsigners"] = sig_data["nonsigners"] - target_batch["locked_tag"] = sig_data["tag"] + + for batch in self._filter_operational_batches_sequence( + app_name, + self._get_batch_index_interval(app_name, state="finalized").complement() + & portion.closed(lower=1, upper=signature_data["index"]), + ): + batch["state"] = "locked" + + target_batch = self._get_operational_batch_by_hash( + app_name, signature_data["hash"] + ) + target_batch["lock_signature"] = signature_data["signature"] + target_batch["locked_nonsigners"] = signature_data["nonsigners"] + target_batch["locked_tag"] = signature_data["tag"] self.apps[app_name]["last_locked_batch"] = target_batch if not self.apps[app_name]["last_sequenced_batch"]: self.apps[app_name]["last_sequenced_batch"] = target_batch - def update_finalized_batches(self, app_name: str, sig_data: dict[str, Any]) -> None: + def finalize_batches(self, app_name: str, signature_data: SignatureData) -> None: """Update batches to 'finalized' state up to a specified index and save snapshots.""" - if sig_data.get("index", 0) <= self.apps[app_name]["last_finalized_batch"].get( - "index", 0 - ): + if signature_data.get("index", 0) <= self.apps[app_name][ + "last_finalized_batch" + ].get("index", 0): return - batches: dict[str, Any] = self.apps[app_name]["batches"] + if ( + signature_data["hash"] + not in self.apps[app_name]["operational_batches_hash_index_map"] + ): + return snapshot_indexes: list[int] = [] - # fixme: sort should be removed after updating batches dict to list - for batch in sorted( - list(batches.values()), key=lambda batch: batch.get("index", 0) + for batch in self._filter_operational_batches_sequence( + app_name, + portion.closed(lower=1, upper=signature_data["index"]), ): - if "index" in batch and batch["index"] <= sig_data["index"]: - batch["state"] = "finalized" - if ( - batch["state"] == "finalized" - and batch["index"] % zconfig.SNAPSHOT_CHUNK == 0 - ): - snapshot_indexes.append(batch["index"]) - - if not batches.get(sig_data["hash"]): - return - target_batch: dict[str, Any] = batches[sig_data["hash"]] - target_batch["finalization_signature"] = sig_data["signature"] - target_batch["finalized_nonsigners"] = sig_data["nonsigners"] - target_batch["finalized_tag"] = sig_data["tag"] + batch["state"] = "finalized" + if batch["index"] % zconfig.SNAPSHOT_CHUNK == 0: + snapshot_indexes.append(batch["index"]) + + target_batch = self._get_operational_batch_by_hash( + app_name, signature_data["hash"] + ) + target_batch["finalization_signature"] = signature_data["signature"] + target_batch["finalized_nonsigners"] = signature_data["nonsigners"] + target_batch["finalized_tag"] = signature_data["tag"] self.apps[app_name]["last_finalized_batch"] = target_batch for snapshot_index in snapshot_indexes: - if snapshot_index <= self.last_stored_index: + if snapshot_index <= self._last_saved_index: continue - self.last_stored_index = snapshot_index - self.save_snapshot(app_name, snapshot_index) + self._last_saved_index = snapshot_index + self._save_snapshot_then_prune(app_name, snapshot_index) def upsert_node_state( self, @@ -421,26 +545,26 @@ def get_nodes_state(self, app_name: str) -> list[dict[str, Any]]: if address in list(zconfig.NODES.keys()) ] - def upsert_locked_sync_point(self, app_name: str, state: dict[str, Any]) -> None: + def upsert_locked_sync_point(self, app_name: str, value: SignatureData) -> None: """Upsert the locked sync point for an app.""" self.apps[app_name]["nodes_state"]["locked_sync_point"] = { - "index": state["index"], - "chaining_hash": state["chaining_hash"], - "hash": state["hash"], - "signature": state["signature"], - "nonsigners": state["nonsigners"], - "tag": state["tag"], + "index": value["index"], + "chaining_hash": value["chaining_hash"], + "hash": value["hash"], + "signature": value["signature"], + "nonsigners": value["nonsigners"], + "tag": value["tag"], } - def upsert_finalized_sync_point(self, app_name: str, state: dict[str, Any]) -> None: + def upsert_finalized_sync_point(self, app_name: str, value: SignatureData) -> None: """Upsert the finalized sync point for an app.""" self.apps[app_name]["nodes_state"]["finalized_sync_point"] = { - "index": state["index"], - "chaining_hash": state["chaining_hash"], - "hash": state["hash"], - "signature": state["signature"], - "nonsigners": state["nonsigners"], - "tag": state["tag"], + "index": value["index"], + "chaining_hash": value["chaining_hash"], + "hash": value["hash"], + "signature": value["signature"], + "nonsigners": value["nonsigners"], + "tag": value["tag"], } def get_locked_sync_point(self, app_name: str) -> dict[str, Any]: @@ -451,107 +575,284 @@ def get_finalized_sync_point(self, app_name: str) -> dict[str, Any]: """Get the finalized sync point for an app.""" return self.apps[app_name]["nodes_state"].get("finalized_sync_point", {}) - def add_missed_batches(self, app_name: str, batches_data: dict[str, Any]) -> None: + def add_missed_batches( + self, app_name: str, missed_batches: Iterable[Batch] + ) -> None: """Add missed batches.""" - self.apps[app_name]["missed_batches"].update(batches_data) + self.apps[app_name]["missed_batches_map"].update( + self._generate_batch_map(missed_batches) + ) - def set_missed_batches(self, app_name: str, batches_data: dict[str, Any]) -> None: + def set_missed_batches( + self, app_name: str, missed_batches: Iterable[Batch] + ) -> None: """set missed batches.""" - self.apps[app_name]["missed_batches"] = batches_data + self.apps[app_name]["missed_batches_map"] = self._generate_batch_map( + missed_batches + ) - def empty_missed_batches(self, app_name: str) -> None: + def clear_missed_batches(self, app_name: str) -> None: """Empty missed batches.""" - self.apps[app_name]["missed_batches"] = {} + self.apps[app_name]["missed_batches_map"] = {} - def get_missed_batches(self, app_name: str) -> dict[str, Any]: + def get_missed_batches(self, app_name: str) -> dict[str, Batch]: """Get missed batches.""" - return self.apps[app_name]["missed_batches"] + return self.apps[app_name]["missed_batches_map"] def has_missed_batches(self) -> bool: """Check if there are missed batches across any app.""" - for app_name in list(zconfig.APPS.keys()): - if self.apps[app_name]["missed_batches"]: - return True - return False + return any( + self.apps[app_name]["missed_batches_map"] + # TODO: Why not simply iterate through the apps? + for app_name in list(zconfig.APPS.keys()) + ) - def reinitialize_db( + def reset_not_finalized_batches_timestamps(self, app_name: str) -> None: + resetting_batches = itertools.chain( + self.apps[app_name]["initialized_batches_map"].values(), + self._filter_operational_batches_sequence( + app_name, + self._get_batch_index_interval( + app_name, state="finalized" + ).complement(), + ), + ) + now = int(time.time()) + for batch in resetting_batches: + batch["timestamp"] = now + + def reinitialize( self, app_name: str, new_sequencer_id: str, - all_nodes_last_finalized_batch: dict[str, Any], + all_nodes_last_finalized_batch: Batch, ): """Reinitialize the database after a switch in the sequencer.""" - self.apps[app_name]["last_sequenced_batch"] = all_nodes_last_finalized_batch - self.apps[app_name]["last_locked_batch"] = all_nodes_last_finalized_batch - self.apps[app_name]["last_finalized_batch"] = all_nodes_last_finalized_batch - self.apps[app_name]["nodes_state"] = {} - self.apps[app_name]["missed_batches"] = {} - # TODO: should get the batches from other nodes if they are missing - self.update_finalized_batches( + self.finalize_batches( app_name, - all_nodes_last_finalized_batch, + signature_data={ + "index": all_nodes_last_finalized_batch["index"], + "chaining_hash": all_nodes_last_finalized_batch["chaining_hash"], + "hash": all_nodes_last_finalized_batch["hash"], + "signature": all_nodes_last_finalized_batch["finalization_signature"], + "nonsigners": all_nodes_last_finalized_batch["finalized_nonsigners"], + "tag": all_nodes_last_finalized_batch["finalized_tag"], + }, ) if zconfig.NODE["id"] == new_sequencer_id: - self.resequence_batches(app_name, all_nodes_last_finalized_batch) + self._resequence_batches(app_name, all_nodes_last_finalized_batch) else: - self.reinitialize_batches(app_name, all_nodes_last_finalized_batch) + self._reinitialize_batches(app_name, all_nodes_last_finalized_batch) + + self.apps[app_name]["nodes_state"] = {} + self.apps[app_name]["missed_batches_map"] = {} - def resequence_batches( - self, app_name: str, all_nodes_last_finalized_batch: dict[str, Any] + def _resequence_batches( + self, app_name: str, all_nodes_last_finalized_batch: Batch ) -> None: """Resequence batches after a switch in the sequencer.""" - keys_to_retain: set[str] = {"app_name", "node_id", "timestamp", "hash", "body"} - index: int = all_nodes_last_finalized_batch.get("index", 0) - chaining_hash: str = all_nodes_last_finalized_batch.get("chaining_hash", "") - - not_finalized_batches: list[Any] = [ - batch - for batch in list(self.apps[app_name]["batches"].values()) - if batch.get("state") != "finalized" - ] - not_finalized_batches.sort(key=lambda x: x.get("index", float("inf"))) - for batch in not_finalized_batches: - filtered_batch = { - key: value for key, value in batch.items() if key in keys_to_retain - } + index = all_nodes_last_finalized_batch["index"] + chaining_hash = all_nodes_last_finalized_batch.get("chaining_hash", "") + resequencing_batches = itertools.chain( + self._filter_operational_batches_sequence( + app_name, + self._get_batch_index_interval(app_name, "finalized").complement(), + ), + self.apps[app_name]["initialized_batches_map"].values(), + ) + resequenced_batches: list[Batch] = [] + for resequencing_batch in resequencing_batches: index += 1 - chaining_hash = utils.gen_hash(chaining_hash + batch["hash"]) - filtered_batch.update( - { - "index": index, - "state": "sequenced", - "chaining_hash": chaining_hash, - } - ) - self.apps[app_name]["batches"][filtered_batch["hash"]] = filtered_batch - self.apps[app_name]["last_sequenced_batch"] = filtered_batch + chaining_hash = utils.gen_hash(chaining_hash + resequencing_batch["hash"]) + resequenced_batch: Batch = { + "app_name": resequencing_batch["app_name"], + "node_id": resequencing_batch["node_id"], + "timestamp": resequencing_batch["timestamp"], + "hash": resequencing_batch["hash"], + "body": resequencing_batch["body"], + "index": index, + "state": "sequenced", + "chaining_hash": chaining_hash, + } + resequenced_batches.append(resequenced_batch) + self.apps[app_name]["operational_batches_hash_index_map"][ + resequenced_batch["hash"] + ] = index + + self.apps[app_name]["operational_batches_sequence"] = self.apps[app_name][ + "operational_batches_sequence" + ][: self.apps[app_name]["last_finalized_batch"].get("index", 0)] + self.apps[app_name]["operational_batches_sequence"].extend(resequenced_batches) + if resequenced_batches: + self.apps[app_name]["last_sequenced_batch"] = resequenced_batches[-1] + self.apps[app_name]["initialized_batches_map"] = {} - def reset_timestamps(self, app_name: str) -> None: - for batch in list(self.apps[app_name]["batches"].values()): - if batch["state"] != "finalized": - batch["timestamp"] = int(time.time()) + self.apps[app_name]["last_sequenced_batch"] = all_nodes_last_finalized_batch + self.apps[app_name]["last_locked_batch"] = all_nodes_last_finalized_batch + self.apps[app_name]["last_finalized_batch"] = all_nodes_last_finalized_batch - def reinitialize_batches( - self, app_name: str, all_nodes_last_finalized_batch: dict[str, Any] + def _reinitialize_batches( + self, app_name: str, all_nodes_last_finalized_batch: Batch ) -> None: """Reinitialize batches after a switch in the sequencer.""" - keys_to_retain: set[str] = { - "app_name", - "node_id", - "timestamp", - "hash", - "body", - } - last_index: int = all_nodes_last_finalized_batch.get("index", 0) - for batch_hash, batch in list(self.apps[app_name]["batches"].items()): - if last_index < batch.get("index", float("inf")): - filtered_batch = { - key: value for key, value in batch.items() if key in keys_to_retain - } - filtered_batch["state"] = "initialized" - self.apps[app_name]["batches"][batch_hash] = filtered_batch + index = all_nodes_last_finalized_batch["index"] + for batch in self._filter_operational_batches_sequence( + app_name, portion.open(index, portion.inf) + ): + reinitialized_batch: Batch = { + "app_name": batch["app_name"], + "node_id": batch["node_id"], + "timestamp": batch["timestamp"], + "hash": batch["hash"], + "body": batch["body"], + "state": "initialized", + } + self.apps[app_name]["initialized_batches_map"][ + reinitialized_batch["hash"] + ] = reinitialized_batch + self.apps[app_name]["operational_batches_hash_index_map"].pop(batch["hash"]) + self.apps[app_name]["operational_batches_sequence"] = self.apps[app_name][ + "operational_batches_sequence" + ][: self._get_batch_relative_index(app_name, index) + 1] + self.apps[app_name]["last_sequenced_batch"] = all_nodes_last_finalized_batch + self.apps[app_name]["last_locked_batch"] = all_nodes_last_finalized_batch + self.apps[app_name]["last_finalized_batch"] = all_nodes_last_finalized_batch + + def _filter_then_add_the_finalized_batches( + self, + loaded_finalized_batches: list[Batch], + states: set[OperationalState], + after: int, + # TODO: In Python, it's very unusual to use parameters as the output of + # a function. + batches_sequence: list[Batch], + batches_hash_set: set[str], + ) -> None: + """Filter and add batches to the result based on state and index.""" + # fixme: sort should be removed after updating batches dict to list + if not loaded_finalized_batches or "finalized" not in states: + return + + first_loaded_batch_index = loaded_finalized_batches[0]["index"] + relative_after = first_loaded_batch_index + after + for batch in loaded_finalized_batches[relative_after + 1 :]: + if len(batches_sequence) >= zconfig.API_BATCHES_LIMIT: + break + + if batch["hash"] in batches_hash_set: + continue + + def _filter_operational_batches_sequence( + self, + app_name: str, + index_interval: portion.Interval, + limit: int | None = None, + ) -> list[Batch]: + index_interval = index_interval.intersection( + portion.closed( + self._get_first_batch_index(app_name), + self._get_last_batch_index(app_name), + ) + ) + relative_index_interval = index_interval.apply( + lambda x: x.replace( + lower=self._get_batch_relative_index(app_name, x.lower), + upper=self._get_batch_relative_index(app_name, x.upper), + ) + ) + return [ + self.apps[app_name]["operational_batches_sequence"][i] + for i in itertools.islice( + portion.iterate(relative_index_interval, step=1), limit + ) + ] + + def _get_operational_batch_by_hash(self, app_name: str, batch_hash: str) -> Batch: + return self._get_operational_batch_by_index( + app_name, + self.apps[app_name]["operational_batches_hash_index_map"][batch_hash], + ) + + def _get_operational_batch_by_index(self, app_name: str, index: int) -> Batch: + return self.apps[app_name]["operational_batches_sequence"][ + self._get_batch_relative_index(app_name, index) + ] + + def _batch_exists(self, app_name: str, batch_hash: str) -> bool: + return ( + batch_hash in self.apps[app_name]["initialized_batches_map"] + or batch_hash in self.apps[app_name]["operational_batches_hash_index_map"] + ) + + def _get_batch_relative_index(self, app_name: str, index: int) -> int: + return index - self._get_first_batch_index(app_name, default=1) + + def _get_batch_index_interval( + self, + app_name: str, + state: OperationalState | None = None, + ) -> portion.Interval: + return portion.closedopen( + self._get_first_batch_index(app_name, state, default=0), + self._get_last_batch_index(app_name, state, default=0), + ) + + def _get_first_batch_index( + self, + app_name: str, + state: OperationalState | None = None, + default: int = 0, + ) -> int: + if self._has_any_operational_batch(app_name, state): + return self.apps[app_name]["operational_batches_sequence"][0]["index"] + else: + return default + + def _get_last_batch_index( + self, + app_name: str, + state: OperationalState | None = None, + default: int = 0, + ) -> int: + if state is None: + batches_sequence = self.apps[app_name]["operational_batches_sequence"] + if batches_sequence: + return batches_sequence[-1]["index"] + else: + return default + else: + return self.get_last_batch(app_name, state).get("index", default) + + def _has_any_batch( + self, + app_name: str, + state: State | None = None, + ) -> bool: + if state is None: + return bool( + self.apps[app_name]["initialized_batches_map"] + ) or self._has_any_operational_batch(app_name, state) + elif state == "initialized": + return bool(self.apps[app_name]["initialized_batches_map"]) + else: + return self._has_any_operational_batch(app_name, state) + + def _has_any_operational_batch( + self, app_name: str, state: OperationalState | None = None + ) -> bool: + return self._get_last_batch_index(app_name, state, default=0) != 0 + + @classmethod + def _generate_batch_map(cls, batches: Iterable[Batch]) -> dict[str, Batch]: + return {batch["hash"]: batch for batch in batches} + + @classmethod + def _generate_batches_hash_index_map( + cls, batches: Iterable[Batch] + ) -> dict[str, int]: + return {batch["hash"]: batch["index"] for batch in batches} zdb: InMemoryDB = InMemoryDB() diff --git a/node/routes.py b/node/routes.py index ac33a4b..aac63f0 100644 --- a/node/routes.py +++ b/node/routes.py @@ -151,28 +151,27 @@ def get_batches(app_name: str, state: str) -> Response: if after < 0: return error_response(ErrorCodes.INVALID_REQUEST, "Invalid after param.") - batches: dict[str, dict] = zdb.get_batches(app_name, { state }, after) - if len(batches) == 0: + batches_sequence = zdb.get_entire_operational_batches_sequence(app_name, { state }, after) + if not batches_sequence: return success_response(data=None) - batches: list[dict] = list(batches.values()) - batches.sort(key = lambda batch: batch["index"]) + for i, batch in enumerate(batches_sequence): + assert batch["index"] == after + i + 1, f'error in getting batches: {batch["index"]} != {after + i + 1}, {i}, {[batch["index"] for batch in batches_sequence]}\n{zdb.apps[app_name]["operational_batches_sequence"]}' - for i in range(len(batches)): - assert batches[i]["index"] == after + i + 1, f'error in getting batches: {batches[i]["index"]} != {after + i + 1}, {i}, {[batch["index"] for batch in batches]}\n{zdb.apps[app_name]["batches"]}' + first_chaining_hash: str = batches_sequence[0]["chaining_hash"] - first_chaining_hash: str = batches[0]["chaining_hash"] - - finalized: dict = {} - for batch in reversed(batches): + finalized = {} + for batch in reversed(batches_sequence): if "finalization_signature" in batch: for k in ("finalization_signature", "index", "hash", "chaining_hash"): finalized[k] = batch[k] finalized["nonsigners"] = batch["finalized_nonsigners"] break - res: dict[str, dict] = { - "batches": [batch["body"] for batch in batches], - "first_chaining_hash": first_chaining_hash, - "finalized": finalized, - } - return success_response(data=res) + + return success_response( + data={ + "batches": [batch["body"] for batch in batches_sequence], + "first_chaining_hash": first_chaining_hash, + "finalized": finalized, + } + ) diff --git a/node/tasks.py b/node/tasks.py index 0631c6d..e2f1d89 100644 --- a/node/tasks.py +++ b/node/tasks.py @@ -13,7 +13,6 @@ import requests from eigensdk.crypto.bls import attestation from typing import List -from utils import NodesRegistryClient from common import bls, utils from common.db import zdb from common.logger import zlogger @@ -26,9 +25,9 @@ def check_finalization() -> None: """Check and add not finalized batches to missed batches.""" for app_name in list(zconfig.APPS.keys()): - not_finalized_batches: dict[str, Any] = zdb.get_not_finalized_batches(app_name) + not_finalized_batches: list[dict[str, Any]] = zdb.get_still_sequenced_batches(app_name) if not_finalized_batches: - zdb.add_missed_batches(app_name=app_name, batches_data=not_finalized_batches) + zdb.add_missed_batches(app_name, not_finalized_batches) def send_batches() -> None: @@ -45,8 +44,8 @@ def send_batches() -> None: def send_app_batches(app_name: str) -> dict[str, Any]: """Send batches for a specific app.""" - initialized_batches: dict[str, Any] = zdb.get_batches( - app_name=app_name, states={"initialized"} + initialized_batches: dict[str, Any] = zdb.get_initialized_batches_map( + app_name=app_name ) last_synced_batch: dict[str, Any] = zdb.get_last_batch( @@ -84,7 +83,7 @@ def send_app_batches(app_name: str) -> dict[str, Any]: if response["error"]["code"] == ErrorCodes.INVALID_NODE_VERSION: zlogger.warning(response["error"]["message"]) return {} - zdb.add_missed_batches(app_name=app_name, batches_data=initialized_batches) + zdb.add_missed_batches(app_name, initialized_batches.values()) return {} sequencer_resp = response["data"] @@ -95,7 +94,7 @@ def send_app_batches(app_name: str) -> dict[str, Any]: ) zdb.is_sequencer_down = False if not censored_batches: - zdb.empty_missed_batches(app_name) + zdb.clear_missed_batches(app_name) seq_tag = max(sequencer_resp["locked"]["tag"], sequencer_resp["finalized"]["tag"]) if seq_tag != 0: @@ -103,7 +102,7 @@ def send_app_batches(app_name: str) -> dict[str, Any]: except Exception: zlogger.exception("An unexpected error occurred:") - zdb.add_missed_batches(app_name=app_name, batches_data=initialized_batches) + zdb.add_missed_batches(app_name, initialized_batches.values()) zdb.is_sequencer_down = True check_finalization() @@ -114,7 +113,7 @@ def sync_with_sequencer( app_name: str, initialized_batches: dict[str, Any], sequencer_response: dict[str, Any] ) -> dict[str, Any]: """Sync batches with the sequencer.""" - zdb.upsert_sequenced_batches(app_name=app_name, batches_data=sequencer_response["batches"]) + zdb.upsert_sequenced_batches(app_name=app_name, sequenced_batches=sequencer_response["batches"]) last_locked_index: str = zdb.apps[app_name]["last_locked_batch"].get("index", 0) if sequencer_response["locked"]["index"] > last_locked_index: if is_sync_point_signature_verified( @@ -127,9 +126,9 @@ def sync_with_sequencer( signature_hex=sequencer_response["locked"]["signature"], nonsigners=sequencer_response["locked"]["nonsigners"], ): - zdb.update_locked_batches( + zdb.lock_batches( app_name=app_name, - sig_data=sequencer_response["locked"], + signature_data=sequencer_response["locked"], ) else: zlogger.error("Invalid locking signature received from sequencer") @@ -146,9 +145,9 @@ def sync_with_sequencer( signature_hex=sequencer_response["finalized"]["signature"], nonsigners=sequencer_response["finalized"]["nonsigners"], ): - zdb.update_finalized_batches( + zdb.finalize_batches( app_name=app_name, - sig_data=sequencer_response["finalized"], + signature_data=sequencer_response["finalized"], ) else: zlogger.error("Invalid finalizing signature received from sequencer") @@ -165,23 +164,23 @@ def check_censorship( ) -> dict[str, Any]: """Check for censorship and update missed batches.""" sequenced_hashes: set[str] = set(batch["hash"] for batch in sequencer_response["batches"]) - censored_batches: dict[str, Any] = { - batch_hash: batch + censored_batches: list[dict[str, Any]] = [ + batch for batch_hash, batch in initialized_batches.items() if batch_hash not in sequenced_hashes - } + ] # remove sequenced batches from the missed batches dict - missed_batches: dict[str, Any] = { - batch_hash: batch - for batch_hash, batch in list(zdb.get_missed_batches(app_name).items()) + missed_batches: list[dict[str, Any]] = [ + batch + for batch_hash, batch in zdb.get_missed_batches(app_name).items() if batch_hash not in sequenced_hashes - } + ] # add censored batches to the missed batches dict - missed_batches.update(censored_batches) + missed_batches += censored_batches - zdb.set_missed_batches(app_name=app_name, batches_data=missed_batches) + zdb.set_missed_batches(app_name=app_name, missed_batches=missed_batches) return censored_batches @@ -389,13 +388,13 @@ def switch_sequencer(old_sequencer_id: str, new_sequencer_id: str) -> bool: all_nodes_last_finalized_batch: dict[str, Any] = ( find_all_nodes_last_finalized_batch(app_name) ) - zdb.reinitialize_db( + zdb.reinitialize( app_name, new_sequencer_id, all_nodes_last_finalized_batch ) if zconfig.NODE['id'] != zconfig.SEQUENCER['id']: time.sleep(10) - zdb.reset_timestamps(app_name) + zdb.reset_not_finalized_batches_timestamps(app_name) zdb.pause_node.clear() return True diff --git a/run.py b/run.py index f5217ca..061892f 100644 --- a/run.py +++ b/run.py @@ -101,4 +101,9 @@ def main() -> None: if __name__ == "__main__": + import debugpy + if "1" in sys.argv or "2" in sys.argv: + debugpy.listen(("0.0.0.0", 5678 + int(sys.argv[1]) - 1)) + debugpy.wait_for_client() + debugpy.breakpoint() main() diff --git a/sequencer/routes.py b/sequencer/routes.py index 3edd2fb..46675d8 100644 --- a/sequencer/routes.py +++ b/sequencer/routes.py @@ -58,28 +58,27 @@ def put_batches() -> Response: def _put_batches(req_data: dict[str, Any]) -> dict[str, Any]: """Process the batches data.""" with zdb.sequencer_put_batches_lock: - zdb.sequencer_init_batches(app_name=req_data["app_name"], batches_data=req_data["batches"]) + zdb.sequencer_init_batches(app_name=req_data["app_name"], initializing_batches=req_data["batches"]) - batches: dict[str, Any] = zdb.get_batches( + batches_sequence = zdb.get_entire_operational_batches_sequence( app_name=req_data["app_name"], states={"sequenced", "locked", "finalized"}, after=req_data["sequenced_index"], ) - batches = sorted(batches.values(), key=lambda x: x["index"]) last_finalized_batch: dict[str, Any] = zdb.get_last_batch( app_name=req_data["app_name"], state="finalized" ) last_locked_batch: dict[str, Any] = zdb.get_last_batch( app_name=req_data["app_name"], state="locked" ) - if batches: - if batches[-1]["index"] < last_finalized_batch.get("index", 0): + if batches_sequence: + if batches_sequence[-1]["index"] < last_finalized_batch.get("index", 0): last_finalized_batch: dict[str, Any] = next( - (d for d in reversed(batches) if "finalization_signature" in d), {} + (d for d in reversed(batches_sequence) if "finalization_signature" in d), {} ) - if batches[-1]["index"] < last_locked_batch.get("index", 0): + if batches_sequence[-1]["index"] < last_locked_batch.get("index", 0): last_locked_batch: dict[str, Any] = next( - (d for d in reversed(batches) if "lock_signature" in d), {} + (d for d in reversed(batches_sequence) if "lock_signature" in d), {} ) zdb.upsert_node_state( @@ -100,7 +99,7 @@ def _put_batches(req_data: dict[str, Any]) -> dict[str, Any]: # txs = {} return { - "batches": batches, + "batches": batches_sequence, "finalized": { "index": last_finalized_batch.get("index", 0), "chaining_hash": last_finalized_batch.get("chaining_hash", ""), diff --git a/sequencer/tasks.py b/sequencer/tasks.py index f000e28..e9a5b5e 100644 --- a/sequencer/tasks.py +++ b/sequencer/tasks.py @@ -93,9 +93,9 @@ async def sync_app(app_name: str) -> None: if lock_signature: locked_data.update(lock_signature) zdb.upsert_locked_sync_point(app_name=app_name, state=locked_data) - zdb.update_locked_batches( + zdb.lock_batches( app_name=app_name, - sig_data=locked_data, + signature_data=locked_data, ) ############################################################ @@ -121,7 +121,7 @@ async def sync_app(app_name: str) -> None: if finalization_signature: finalized_data.update(finalization_signature) zdb.upsert_finalized_sync_point(app_name=app_name, state=finalized_data) - zdb.update_finalized_batches( + zdb.finalize_batches( app_name=app_name, - sig_data=finalized_data, + signature_data=finalized_data, ) From 33cc18306322a23ac31ab1ba150c8aef0506d6ff Mon Sep 17 00:00:00 2001 From: Alireza Roshanzamir Date: Wed, 29 Jan 2025 13:25:55 +0330 Subject: [PATCH 03/13] Fix not filling the finalized batches in the _filter_then_add_the_finalized_batches function. --- common/db.py | 38 +++++++++++++++++++++----------------- sequencer/tasks.py | 16 +++++++++++++--- 2 files changed, 34 insertions(+), 20 deletions(-) diff --git a/common/db.py b/common/db.py index affc898..9608a09 100644 --- a/common/db.py +++ b/common/db.py @@ -545,26 +545,26 @@ def get_nodes_state(self, app_name: str) -> list[dict[str, Any]]: if address in list(zconfig.NODES.keys()) ] - def upsert_locked_sync_point(self, app_name: str, value: SignatureData) -> None: + def upsert_locked_sync_point(self, app_name: str, signature_data: SignatureData) -> None: """Upsert the locked sync point for an app.""" self.apps[app_name]["nodes_state"]["locked_sync_point"] = { - "index": value["index"], - "chaining_hash": value["chaining_hash"], - "hash": value["hash"], - "signature": value["signature"], - "nonsigners": value["nonsigners"], - "tag": value["tag"], + "index": signature_data["index"], + "chaining_hash": signature_data["chaining_hash"], + "hash": signature_data["hash"], + "signature": signature_data["signature"], + "nonsigners": signature_data["nonsigners"], + "tag": signature_data["tag"], } - def upsert_finalized_sync_point(self, app_name: str, value: SignatureData) -> None: + def upsert_finalized_sync_point(self, app_name: str, signature_data: SignatureData) -> None: """Upsert the finalized sync point for an app.""" self.apps[app_name]["nodes_state"]["finalized_sync_point"] = { - "index": value["index"], - "chaining_hash": value["chaining_hash"], - "hash": value["hash"], - "signature": value["signature"], - "nonsigners": value["nonsigners"], - "tag": value["tag"], + "index": signature_data["index"], + "chaining_hash": signature_data["chaining_hash"], + "hash": signature_data["hash"], + "signature": signature_data["signature"], + "nonsigners": signature_data["nonsigners"], + "tag": signature_data["tag"], } def get_locked_sync_point(self, app_name: str) -> dict[str, Any]: @@ -731,19 +731,23 @@ def _filter_then_add_the_finalized_batches( batches_hash_set: set[str], ) -> None: """Filter and add batches to the result based on state and index.""" - # fixme: sort should be removed after updating batches dict to list if not loaded_finalized_batches or "finalized" not in states: return + # TODO: Remove duplication. first_loaded_batch_index = loaded_finalized_batches[0]["index"] - relative_after = first_loaded_batch_index + after - for batch in loaded_finalized_batches[relative_after + 1 :]: + relative_after = after - first_loaded_batch_index + 1 + + for batch in loaded_finalized_batches[relative_after:]: if len(batches_sequence) >= zconfig.API_BATCHES_LIMIT: break if batch["hash"] in batches_hash_set: continue + batches_sequence.append(batch) + batches_hash_set.add(batch["hash"]) + def _filter_operational_batches_sequence( self, app_name: str, diff --git a/sequencer/tasks.py b/sequencer/tasks.py index e9a5b5e..93e5807 100644 --- a/sequencer/tasks.py +++ b/sequencer/tasks.py @@ -5,7 +5,7 @@ from typing import Any from common import bls -from common.db import zdb +from common.db import zdb, SignatureData from config import zconfig @@ -92,7 +92,12 @@ async def sync_app(app_name: str) -> None: if lock_signature: locked_data.update(lock_signature) - zdb.upsert_locked_sync_point(app_name=app_name, state=locked_data) + locked_data = { + key: value + for key, value in locked_data.items() + if key in SignatureData.__annotations__ + } + zdb.upsert_locked_sync_point(app_name=app_name, signature_data=locked_data) zdb.lock_batches( app_name=app_name, signature_data=locked_data, @@ -120,7 +125,12 @@ async def sync_app(app_name: str) -> None: ) if finalization_signature: finalized_data.update(finalization_signature) - zdb.upsert_finalized_sync_point(app_name=app_name, state=finalized_data) + finalized_data = { + key: value + for key, value in finalized_data.items() + if key in SignatureData.__annotations__ + } + zdb.upsert_finalized_sync_point(app_name=app_name, signature_data=finalized_data) zdb.finalize_batches( app_name=app_name, signature_data=finalized_data, From ae141ac4a6a20d60128925b0f79615227476eabb Mon Sep 17 00:00:00 2001 From: Alireza Roshanzamir Date: Wed, 29 Jan 2025 13:41:45 +0330 Subject: [PATCH 04/13] Remove redundant codes. --- common/db.py | 29 ++++++----------------------- 1 file changed, 6 insertions(+), 23 deletions(-) diff --git a/common/db.py b/common/db.py index 9608a09..9ed2a51 100644 --- a/common/db.py +++ b/common/db.py @@ -16,9 +16,6 @@ import portion # type: ignore[import-untyped] from typing import TypedDict, Iterable import typeguard -import debugpy # type: ignore[import-untyped] -import random -import sys State = Literal["initialized", "sequenced", "locked", "finalized"] OperationalState = Literal["sequenced", "locked", "finalized"] @@ -83,30 +80,12 @@ def __new__(cls) -> InMemoryDB: def _initialize(self) -> None: """Initialize the InMemoryDB instance.""" - # if "1" in sys.argv: - # listening_port = 5678 - # zlogger.error(f"Listening on {listening_port}") - # debugpy.listen(("0.0.0.0", listening_port)) - # debugpy.wait_for_client() - # debugpy.breakpoint() - self.sequencer_put_batches_lock = threading.Lock() self.pause_node = threading.Event() self._last_saved_index = 0 self.is_sequencer_down = False self.apps = self._load_finalized_batches_for_all_apps() - # def on_change() -> None: - # for app_name, app in self.apps.items(): - # zlogger.warning( - # "s: {}, l: {}, f: {}".format( - # app.get("last_sequenced_batch", {}).get("index", -1), - # app.get("last_locked_batch", {}).get("index", -1), - # app.get("last_finalized_batch", {}).get("index", -1), - # ) - # ) - # self.apps.bind_callback(on_change) - def _fetch_apps(self) -> None: """Fetchs the apps data.""" data = get_file_content(zconfig.APPS_FILE) @@ -545,7 +524,9 @@ def get_nodes_state(self, app_name: str) -> list[dict[str, Any]]: if address in list(zconfig.NODES.keys()) ] - def upsert_locked_sync_point(self, app_name: str, signature_data: SignatureData) -> None: + def upsert_locked_sync_point( + self, app_name: str, signature_data: SignatureData + ) -> None: """Upsert the locked sync point for an app.""" self.apps[app_name]["nodes_state"]["locked_sync_point"] = { "index": signature_data["index"], @@ -556,7 +537,9 @@ def upsert_locked_sync_point(self, app_name: str, signature_data: SignatureData) "tag": signature_data["tag"], } - def upsert_finalized_sync_point(self, app_name: str, signature_data: SignatureData) -> None: + def upsert_finalized_sync_point( + self, app_name: str, signature_data: SignatureData + ) -> None: """Upsert the finalized sync point for an app.""" self.apps[app_name]["nodes_state"]["finalized_sync_point"] = { "index": signature_data["index"], From a243ad3769cd638f53e9f994bb64e778becfb71c Mon Sep 17 00:00:00 2001 From: Alireza Roshanzamir Date: Wed, 29 Jan 2025 16:25:52 +0330 Subject: [PATCH 05/13] Add the portion library requirement. --- requirements.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/requirements.txt b/requirements.txt index b01e6ab..168a057 100644 --- a/requirements.txt +++ b/requirements.txt @@ -8,3 +8,4 @@ eigensdk==0.0.3 requests==2.32.3 pydantic==2.9.2 tenacity==9.0.0 +portion==2.6.0 \ No newline at end of file From 2719693c633bca99c14c7800f61d032c8fbb4d4c Mon Sep 17 00:00:00 2001 From: Alireza Roshanzamir Date: Wed, 29 Jan 2025 16:26:58 +0330 Subject: [PATCH 06/13] Improve some confusing names of InMemoryDB methods. --- common/db.py | 107 ++++++++++++++++++++++---------------------- node/routes.py | 10 ++--- node/tasks.py | 6 +-- sequencer/routes.py | 6 +-- 4 files changed, 64 insertions(+), 65 deletions(-) diff --git a/common/db.py b/common/db.py index 9ed2a51..3ecf343 100644 --- a/common/db.py +++ b/common/db.py @@ -15,12 +15,27 @@ import itertools import portion # type: ignore[import-untyped] from typing import TypedDict, Iterable -import typeguard +import typeguard # TODO: Remove. + State = Literal["initialized", "sequenced", "locked", "finalized"] OperationalState = Literal["sequenced", "locked", "finalized"] +class App(TypedDict, total=False): + # TODO: Annotate the keys and values. + nodes_state: dict[str, Any] + initialized_batches_map: dict[str, Batch] + operational_batches_sequence: list[Batch] + # TODO: Check if it's necessary. + operational_batches_hash_index_map: dict[str, int] + missed_batches_map: dict[str, Batch] + # TODO: Store the batch hash or index instead of the entire batch. + last_sequenced_batch: Batch + last_locked_batch: Batch + last_finalized_batch: Batch + + class Batch(TypedDict, total=False): app_name: str node_id: str @@ -47,21 +62,13 @@ class SignatureData(TypedDict, total=False): tag: int -class App(TypedDict, total=False): - nodes_state: dict[str, Any] - initialized_batches_map: dict[str, Batch] - operational_batches_sequence: list[Batch] - operational_batches_hash_index_map: dict[str, int] - missed_batches_map: dict[str, Batch] - last_sequenced_batch: Batch - last_locked_batch: Batch - last_finalized_batch: Batch - - @typeguard.typechecked class InMemoryDB: """A thread-safe singleton in-memory database class to manage batches of transactions and states for apps.""" + _GLOBAL_FIRST_BATCH_INDEX = 1 + _NOT_SET_BATCH_INDEX = _GLOBAL_FIRST_BATCH_INDEX - 1 + # TODO: We only have one instantiation of this class, why we use locking and singleton here? _instance: InMemoryDB | None = None instance_lock: threading.Lock = threading.Lock() @@ -155,7 +162,6 @@ def _load_finalized_batches_for_all_apps(cls) -> dict[str, App]: finalized_batches ), "missed_batches_map": {}, - # TODO: store batch hash instead of batch "last_sequenced_batch": last_finalized_batch, "last_locked_batch": last_finalized_batch, "last_finalized_batch": last_finalized_batch, @@ -163,7 +169,7 @@ def _load_finalized_batches_for_all_apps(cls) -> dict[str, App]: return result - # NOTE: Unused method. + # TODO: Remove the unused method. @staticmethod def _load_keys() -> dict[str, Any]: """Load keys from the snapshot file.""" @@ -267,7 +273,7 @@ def _prune_initialized_and_old_operational_batches( def get_initialized_batches_map(self, app_name: str) -> dict[str, Batch]: return self.apps[app_name]["initialized_batches_map"] - def get_entire_operational_batches_sequence( + def get_global_operational_batches_sequence( self, app_name: str, states: set[OperationalState], after: int = 0 ) -> list[Batch]: """Get batches filtered by state and optionally by index.""" @@ -326,7 +332,7 @@ def get_batch(self, app_name: str, batch_hash: str) -> Batch: batch_hash ] return self.apps[app_name]["operational_batches_sequence"][ - self._get_batch_relative_index(app_name, batch_index) + self._calculate_relative_index(app_name, batch_index) ] return {} @@ -341,7 +347,7 @@ def get_still_sequenced_batches(self, app_name: str) -> list[Batch]: if batch["timestamp"] < border ] - def init_batches(self, app_name: str, bodies: list[str]) -> None: + def init_batches(self, app_name: str, bodies: Iterable[str]) -> None: """Initialize batches of transactions with a given body.""" if not bodies: return @@ -359,10 +365,12 @@ def init_batches(self, app_name: str, bodies: list[str]) -> None: "body": body, } - def get_last_batch(self, app_name: str, state: OperationalState) -> Batch: + def get_last_operational_batch( + self, app_name: str, state: OperationalState | None + ) -> Batch: """Get the last batch for a given state.""" match state: - case "sequenced": + case "sequenced" | None: return self.apps[app_name]["last_sequenced_batch"] case "locked": return self.apps[app_name]["last_locked_batch"] @@ -563,14 +571,14 @@ def add_missed_batches( ) -> None: """Add missed batches.""" self.apps[app_name]["missed_batches_map"].update( - self._generate_batch_map(missed_batches) + self._generate_batches_map(missed_batches) ) def set_missed_batches( self, app_name: str, missed_batches: Iterable[Batch] ) -> None: """set missed batches.""" - self.apps[app_name]["missed_batches_map"] = self._generate_batch_map( + self.apps[app_name]["missed_batches_map"] = self._generate_batches_map( missed_batches ) @@ -609,7 +617,7 @@ def reinitialize( app_name: str, new_sequencer_id: str, all_nodes_last_finalized_batch: Batch, - ): + ) -> None: """Reinitialize the database after a switch in the sequencer.""" # TODO: should get the batches from other nodes if they are missing self.finalize_batches( @@ -698,7 +706,7 @@ def _reinitialize_batches( self.apps[app_name]["operational_batches_hash_index_map"].pop(batch["hash"]) self.apps[app_name]["operational_batches_sequence"] = self.apps[app_name][ "operational_batches_sequence" - ][: self._get_batch_relative_index(app_name, index) + 1] + ][: self._calculate_relative_index(app_name, index) + 1] self.apps[app_name]["last_sequenced_batch"] = all_nodes_last_finalized_batch self.apps[app_name]["last_locked_batch"] = all_nodes_last_finalized_batch self.apps[app_name]["last_finalized_batch"] = all_nodes_last_finalized_batch @@ -717,11 +725,11 @@ def _filter_then_add_the_finalized_batches( if not loaded_finalized_batches or "finalized" not in states: return - # TODO: Remove duplication. + # TODO: Remove relative index calculation duplication. first_loaded_batch_index = loaded_finalized_batches[0]["index"] - relative_after = after - first_loaded_batch_index + 1 + relative_after = after - first_loaded_batch_index - for batch in loaded_finalized_batches[relative_after:]: + for batch in loaded_finalized_batches[relative_after + 1 :]: if len(batches_sequence) >= zconfig.API_BATCHES_LIMIT: break @@ -737,16 +745,13 @@ def _filter_operational_batches_sequence( index_interval: portion.Interval, limit: int | None = None, ) -> list[Batch]: - index_interval = index_interval.intersection( - portion.closed( - self._get_first_batch_index(app_name), - self._get_last_batch_index(app_name), - ) + feasible_index_interval = index_interval.intersection( + self._get_batch_index_interval(app_name) ) - relative_index_interval = index_interval.apply( + relative_index_interval = feasible_index_interval.apply( lambda x: x.replace( - lower=self._get_batch_relative_index(app_name, x.lower), - upper=self._get_batch_relative_index(app_name, x.upper), + lower=self._calculate_relative_index(app_name, x.lower), + upper=self._calculate_relative_index(app_name, x.upper), ) ) return [ @@ -764,7 +769,7 @@ def _get_operational_batch_by_hash(self, app_name: str, batch_hash: str) -> Batc def _get_operational_batch_by_index(self, app_name: str, index: int) -> Batch: return self.apps[app_name]["operational_batches_sequence"][ - self._get_batch_relative_index(app_name, index) + self._calculate_relative_index(app_name, index) ] def _batch_exists(self, app_name: str, batch_hash: str) -> bool: @@ -773,44 +778,38 @@ def _batch_exists(self, app_name: str, batch_hash: str) -> bool: or batch_hash in self.apps[app_name]["operational_batches_hash_index_map"] ) - def _get_batch_relative_index(self, app_name: str, index: int) -> int: - return index - self._get_first_batch_index(app_name, default=1) + def _calculate_relative_index(self, app_name: str, index: int) -> int: + return index - self._get_first_batch_index(app_name) def _get_batch_index_interval( self, app_name: str, state: OperationalState | None = None, ) -> portion.Interval: - return portion.closedopen( - self._get_first_batch_index(app_name, state, default=0), - self._get_last_batch_index(app_name, state, default=0), + return portion.closed( + self._get_first_batch_index(app_name, state), + self._get_last_batch_index(app_name, state), ) def _get_first_batch_index( self, app_name: str, state: OperationalState | None = None, - default: int = 0, + default: int = _GLOBAL_FIRST_BATCH_INDEX, ) -> int: - if self._has_any_operational_batch(app_name, state): - return self.apps[app_name]["operational_batches_sequence"][0]["index"] - else: + if not self._has_any_operational_batch(app_name, state): return default + return self.apps[app_name]["operational_batches_sequence"][0]["index"] def _get_last_batch_index( self, app_name: str, state: OperationalState | None = None, - default: int = 0, + default: int = _NOT_SET_BATCH_INDEX, ) -> int: - if state is None: - batches_sequence = self.apps[app_name]["operational_batches_sequence"] - if batches_sequence: - return batches_sequence[-1]["index"] - else: - return default - else: - return self.get_last_batch(app_name, state).get("index", default) + if not self._has_any_operational_batch(app_name, state): + return default + return self.get_last_operational_batch(app_name, state).get("index", default) def _has_any_batch( self, @@ -832,7 +831,7 @@ def _has_any_operational_batch( return self._get_last_batch_index(app_name, state, default=0) != 0 @classmethod - def _generate_batch_map(cls, batches: Iterable[Batch]) -> dict[str, Batch]: + def _generate_batches_map(cls, batches: Iterable[Batch]) -> dict[str, Batch]: return {batch["hash"]: batch for batch in batches} @classmethod diff --git a/node/routes.py b/node/routes.py index aac63f0..f2c9be2 100644 --- a/node/routes.py +++ b/node/routes.py @@ -115,9 +115,9 @@ def get_state() -> Response: } for app_name in list(zconfig.APPS.keys()): - last_sequenced_batch = zdb.get_last_batch(app_name, "sequenced") - last_locked_batch = zdb.get_last_batch(app_name, "locked") - last_finalized_batch = zdb.get_last_batch(app_name, "finalized") + last_sequenced_batch = zdb.get_last_operational_batch(app_name, "sequenced") + last_locked_batch = zdb.get_last_operational_batch(app_name, "locked") + last_finalized_batch = zdb.get_last_operational_batch(app_name, "finalized") data['apps'][app_name] = { "last_sequenced_index": last_sequenced_batch.get("index", 0), @@ -136,7 +136,7 @@ def get_last_finalized_batch(app_name: str) -> Response: """Get the last finalized batch for a given app.""" if app_name not in list(zconfig.APPS): return error_response(ErrorCodes.INVALID_REQUEST, "Invalid app name.") - last_finalized_batch: dict[str, Any] = zdb.get_last_batch(app_name, "finalized") + last_finalized_batch: dict[str, Any] = zdb.get_last_operational_batch(app_name, "finalized") return success_response(data=last_finalized_batch) @@ -151,7 +151,7 @@ def get_batches(app_name: str, state: str) -> Response: if after < 0: return error_response(ErrorCodes.INVALID_REQUEST, "Invalid after param.") - batches_sequence = zdb.get_entire_operational_batches_sequence(app_name, { state }, after) + batches_sequence = zdb.get_global_operational_batches_sequence(app_name, { state }, after) if not batches_sequence: return success_response(data=None) diff --git a/node/tasks.py b/node/tasks.py index e2f1d89..53fa1bd 100644 --- a/node/tasks.py +++ b/node/tasks.py @@ -48,10 +48,10 @@ def send_app_batches(app_name: str) -> dict[str, Any]: app_name=app_name ) - last_synced_batch: dict[str, Any] = zdb.get_last_batch( + last_synced_batch: dict[str, Any] = zdb.get_last_operational_batch( app_name=app_name, state="sequenced" ) - last_locked_batch: dict[str, Any] = zdb.get_last_batch( + last_locked_batch: dict[str, Any] = zdb.get_last_operational_batch( app_name=app_name, state="locked" ) @@ -401,7 +401,7 @@ def switch_sequencer(old_sequencer_id: str, new_sequencer_id: str) -> bool: def find_all_nodes_last_finalized_batch(app_name: str) -> dict[str, Any]: """Find the last finalized batch from all nodes.""" - last_finalized_batch: dict[str, Any] = zdb.get_last_batch( + last_finalized_batch: dict[str, Any] = zdb.get_last_operational_batch( app_name=app_name, state="finalized" ) diff --git a/sequencer/routes.py b/sequencer/routes.py index 46675d8..9691eb0 100644 --- a/sequencer/routes.py +++ b/sequencer/routes.py @@ -60,15 +60,15 @@ def _put_batches(req_data: dict[str, Any]) -> dict[str, Any]: with zdb.sequencer_put_batches_lock: zdb.sequencer_init_batches(app_name=req_data["app_name"], initializing_batches=req_data["batches"]) - batches_sequence = zdb.get_entire_operational_batches_sequence( + batches_sequence = zdb.get_global_operational_batches_sequence( app_name=req_data["app_name"], states={"sequenced", "locked", "finalized"}, after=req_data["sequenced_index"], ) - last_finalized_batch: dict[str, Any] = zdb.get_last_batch( + last_finalized_batch: dict[str, Any] = zdb.get_last_operational_batch( app_name=req_data["app_name"], state="finalized" ) - last_locked_batch: dict[str, Any] = zdb.get_last_batch( + last_locked_batch: dict[str, Any] = zdb.get_last_operational_batch( app_name=req_data["app_name"], state="locked" ) if batches_sequence: From f909b0d25083288a908a6b44817eb791679baf76 Mon Sep 17 00:00:00 2001 From: Alireza Roshanzamir Date: Wed, 29 Jan 2025 22:30:12 +0330 Subject: [PATCH 07/13] Use named constants instead of magic numbers. --- common/db.py | 142 +++++++++++++++++++++++++++++++++----------------- node/tasks.py | 4 +- 2 files changed, 97 insertions(+), 49 deletions(-) diff --git a/common/db.py b/common/db.py index 3ecf343..c6ac0d1 100644 --- a/common/db.py +++ b/common/db.py @@ -67,12 +67,18 @@ class InMemoryDB: """A thread-safe singleton in-memory database class to manage batches of transactions and states for apps.""" _GLOBAL_FIRST_BATCH_INDEX = 1 + _BEFORE_GLOBAL_FIRST_BATCH_INDEX = _GLOBAL_FIRST_BATCH_INDEX - 1 _NOT_SET_BATCH_INDEX = _GLOBAL_FIRST_BATCH_INDEX - 1 - # TODO: We only have one instantiation of this class, why we use locking and singleton here? + # TODO: We only have one instantiation of this class, why we use locking and + # singleton here? _instance: InMemoryDB | None = None instance_lock: threading.Lock = threading.Lock() + # TODO: It seems that there are some possible race-condition situations where + # multiple data structures are mutated together, or a chain of methods needs + # to be called atomically. + def __new__(cls) -> InMemoryDB: """Singleton pattern implementation to ensure only one instance exists.""" with cls.instance_lock: @@ -89,7 +95,7 @@ def _initialize(self) -> None: """Initialize the InMemoryDB instance.""" self.sequencer_put_batches_lock = threading.Lock() self.pause_node = threading.Event() - self._last_saved_index = 0 + self._last_saved_index = self._BEFORE_GLOBAL_FIRST_BATCH_INDEX self.is_sequencer_down = False self.apps = self._load_finalized_batches_for_all_apps() @@ -173,7 +179,7 @@ def _load_finalized_batches_for_all_apps(cls) -> dict[str, App]: @staticmethod def _load_keys() -> dict[str, Any]: """Load keys from the snapshot file.""" - keys_path: str = os.path.join(zconfig.SNAPSHOT_PATH, "keys.json.gz") + keys_path = os.path.join(zconfig.SNAPSHOT_PATH, "keys.json.gz") try: with gzip.open(keys_path, "rt", encoding="UTF-8") as file: return json.load(file) @@ -219,7 +225,10 @@ def _load_finalized_batches( return [] def _save_snapshot_then_prune(self, app_name: str, index: int) -> None: - """Save a snapshot of the finalized batches to a file.""" + """ + Save a snapshot of the finalized batches to a file and prune old \ + initialized and operational batches. + """ snapshot_border = index - zconfig.SNAPSHOT_CHUNK remove_border = max( index - zconfig.SNAPSHOT_CHUNK * zconfig.REMOVE_CHUNK_BORDER, 0 @@ -274,14 +283,17 @@ def get_initialized_batches_map(self, app_name: str) -> dict[str, Batch]: return self.apps[app_name]["initialized_batches_map"] def get_global_operational_batches_sequence( - self, app_name: str, states: set[OperationalState], after: int = 0 + self, + app_name: str, + states: set[OperationalState], + after: int = _BEFORE_GLOBAL_FIRST_BATCH_INDEX, ) -> list[Batch]: """Get batches filtered by state and optionally by index.""" batches_sequence: list[Batch] = [] batches_hash_set: set[str] = set() # TODO: Check if this is necessary. last_finalized_index = self.apps[app_name]["last_finalized_batch"].get( - "index", 0 + "index", self._BEFORE_GLOBAL_FIRST_BATCH_INDEX ) current_chunk = math.ceil((after + 1) / zconfig.SNAPSHOT_CHUNK) next_chunk = math.ceil( @@ -328,12 +340,7 @@ def get_batch(self, app_name: str, batch_hash: str) -> Batch: if batch_hash in self.apps[app_name]["initialized_batches_map"]: return self.apps[app_name]["initialized_batches_map"][batch_hash] elif batch_hash in self.apps[app_name]["operational_batches_hash_index_map"]: - batch_index = self.apps[app_name]["operational_batches_hash_index_map"][ - batch_hash - ] - return self.apps[app_name]["operational_batches_sequence"][ - self._calculate_relative_index(app_name, batch_index) - ] + return self._get_operational_batch_by_hash(app_name, batch_hash) return {} def get_still_sequenced_batches(self, app_name: str) -> list[Batch]: @@ -354,7 +361,7 @@ def init_batches(self, app_name: str, bodies: Iterable[str]) -> None: now = int(time.time()) for body in bodies: - batch_hash: str = utils.gen_hash(body) + batch_hash = utils.gen_hash(body) if not self._batch_exists(app_name, batch_hash): self.apps[app_name]["initialized_batches_map"][batch_hash] = { "app_name": app_name, @@ -386,7 +393,7 @@ def sequencer_init_batches( last_sequenced_batch = self.apps[app_name]["last_sequenced_batch"] chaining_hash = last_sequenced_batch.get("chaining_hash", "") - index = last_sequenced_batch.get("index", 0) + index = last_sequenced_batch.get("index", self._GLOBAL_FIRST_BATCH_INDEX) for batch in initializing_batches: if self._batch_exists(app_name, batch["hash"]): @@ -399,7 +406,6 @@ def sequencer_init_batches( ) continue - index += 1 chaining_hash = utils.gen_hash(chaining_hash + batch_hash) batch.update( { @@ -408,11 +414,13 @@ def sequencer_init_batches( "chaining_hash": chaining_hash, } ) + self.apps[app_name]["operational_batches_sequence"].append(batch) self.apps[app_name]["operational_batches_hash_index_map"][batch_hash] = ( batch["index"] ) self.apps[app_name]["last_sequenced_batch"] = batch + index += 1 def upsert_sequenced_batches( self, app_name: str, sequenced_batches: list[Batch] @@ -426,7 +434,7 @@ def upsert_sequenced_batches( ) now = int(time.time()) for batch in sequenced_batches: - if chaining_hash or batch["index"] == 1: + if chaining_hash or batch["index"] == self._GLOBAL_FIRST_BATCH_INDEX: chaining_hash = utils.gen_hash(chaining_hash + batch["hash"]) if batch["chaining_hash"] != chaining_hash: zlogger.warning( @@ -449,7 +457,7 @@ def upsert_sequenced_batches( def lock_batches(self, app_name: str, signature_data: SignatureData) -> None: """Update batches to 'locked' state up to a specified index.""" if signature_data["index"] <= self.apps[app_name]["last_locked_batch"].get( - "index", 0 + "index", self._BEFORE_GLOBAL_FIRST_BATCH_INDEX ): return @@ -462,7 +470,9 @@ def lock_batches(self, app_name: str, signature_data: SignatureData) -> None: for batch in self._filter_operational_batches_sequence( app_name, self._get_batch_index_interval(app_name, state="finalized").complement() - & portion.closed(lower=1, upper=signature_data["index"]), + & portion.closed( + lower=self._GLOBAL_FIRST_BATCH_INDEX, upper=signature_data["index"] + ), ): batch["state"] = "locked" @@ -478,9 +488,11 @@ def lock_batches(self, app_name: str, signature_data: SignatureData) -> None: def finalize_batches(self, app_name: str, signature_data: SignatureData) -> None: """Update batches to 'finalized' state up to a specified index and save snapshots.""" - if signature_data.get("index", 0) <= self.apps[app_name][ - "last_finalized_batch" - ].get("index", 0): + if signature_data.get( + "index", self._BEFORE_GLOBAL_FIRST_BATCH_INDEX + ) <= self.apps[app_name]["last_finalized_batch"].get( + "index", self._BEFORE_GLOBAL_FIRST_BATCH_INDEX + ): return if ( @@ -492,7 +504,9 @@ def finalize_batches(self, app_name: str, signature_data: SignatureData) -> None snapshot_indexes: list[int] = [] for batch in self._filter_operational_batches_sequence( app_name, - portion.closed(lower=1, upper=signature_data["index"]), + portion.closed( + lower=self._GLOBAL_FIRST_BATCH_INDEX, upper=signature_data["index"] + ), ): batch["state"] = "finalized" if batch["index"] % zconfig.SNAPSHOT_CHUNK == 0: @@ -586,7 +600,7 @@ def clear_missed_batches(self, app_name: str) -> None: """Empty missed batches.""" self.apps[app_name]["missed_batches_map"] = {} - def get_missed_batches(self, app_name: str) -> dict[str, Batch]: + def get_missed_batches_map(self, app_name: str) -> dict[str, Batch]: """Get missed batches.""" return self.apps[app_name]["missed_batches_map"] @@ -619,7 +633,7 @@ def reinitialize( all_nodes_last_finalized_batch: Batch, ) -> None: """Reinitialize the database after a switch in the sequencer.""" - # TODO: should get the batches from other nodes if they are missing + # TODO: Should get the batches from other nodes if they are missing. self.finalize_batches( app_name, signature_data={ @@ -644,18 +658,23 @@ def _resequence_batches( self, app_name: str, all_nodes_last_finalized_batch: Batch ) -> None: """Resequence batches after a switch in the sequencer.""" - index = all_nodes_last_finalized_batch["index"] + index = all_nodes_last_finalized_batch.get( + "index", self._GLOBAL_FIRST_BATCH_INDEX + ) chaining_hash = all_nodes_last_finalized_batch.get("chaining_hash", "") resequencing_batches = itertools.chain( self._filter_operational_batches_sequence( app_name, + # TODO: I guess we should resequence all batches until the index of + # the last finalized batch across all nodes instead of all locally + # non-finalized batches, as the fully finalized batch across all nodes + # may differ from the local last finalized batch. self._get_batch_index_interval(app_name, "finalized").complement(), ), self.apps[app_name]["initialized_batches_map"].values(), ) resequenced_batches: list[Batch] = [] for resequencing_batch in resequencing_batches: - index += 1 chaining_hash = utils.gen_hash(chaining_hash + resequencing_batch["hash"]) resequenced_batch: Batch = { "app_name": resequencing_batch["app_name"], @@ -671,16 +690,19 @@ def _resequence_batches( self.apps[app_name]["operational_batches_hash_index_map"][ resequenced_batch["hash"] ] = index + index += 1 - self.apps[app_name]["operational_batches_sequence"] = self.apps[app_name][ - "operational_batches_sequence" - ][: self.apps[app_name]["last_finalized_batch"].get("index", 0)] - self.apps[app_name]["operational_batches_sequence"].extend(resequenced_batches) + self.apps[app_name]["initialized_batches_map"] = {} + self.apps[app_name]["operational_batches_sequence"] = ( + self._filter_operational_batches_sequence( + app_name, self._get_batch_index_interval(app_name, "finalized") + ) + + resequenced_batches + ) if resequenced_batches: self.apps[app_name]["last_sequenced_batch"] = resequenced_batches[-1] - self.apps[app_name]["initialized_batches_map"] = {} - - self.apps[app_name]["last_sequenced_batch"] = all_nodes_last_finalized_batch + else: + self.apps[app_name]["last_sequenced_batch"] = all_nodes_last_finalized_batch self.apps[app_name]["last_locked_batch"] = all_nodes_last_finalized_batch self.apps[app_name]["last_finalized_batch"] = all_nodes_last_finalized_batch @@ -688,9 +710,11 @@ def _reinitialize_batches( self, app_name: str, all_nodes_last_finalized_batch: Batch ) -> None: """Reinitialize batches after a switch in the sequencer.""" - index = all_nodes_last_finalized_batch["index"] + all_nodes_last_finalized_batch_index = all_nodes_last_finalized_batch.get( + "index", self._BEFORE_GLOBAL_FIRST_BATCH_INDEX + ) for batch in self._filter_operational_batches_sequence( - app_name, portion.open(index, portion.inf) + app_name, portion.open(all_nodes_last_finalized_batch_index, portion.inf) ): reinitialized_batch: Batch = { "app_name": batch["app_name"], @@ -701,12 +725,18 @@ def _reinitialize_batches( "state": "initialized", } self.apps[app_name]["initialized_batches_map"][ - reinitialized_batch["hash"] + batch["hash"] ] = reinitialized_batch self.apps[app_name]["operational_batches_hash_index_map"].pop(batch["hash"]) - self.apps[app_name]["operational_batches_sequence"] = self.apps[app_name][ - "operational_batches_sequence" - ][: self._calculate_relative_index(app_name, index) + 1] + + self.apps[app_name]["operational_batches_sequence"] = ( + self._filter_operational_batches_sequence( + app_name, + portion.closed( + self._GLOBAL_FIRST_BATCH_INDEX, all_nodes_last_finalized_batch_index + ), + ) + ) self.apps[app_name]["last_sequenced_batch"] = all_nodes_last_finalized_batch self.apps[app_name]["last_locked_batch"] = all_nodes_last_finalized_batch self.apps[app_name]["last_finalized_batch"] = all_nodes_last_finalized_batch @@ -779,7 +809,9 @@ def _batch_exists(self, app_name: str, batch_hash: str) -> bool: ) def _calculate_relative_index(self, app_name: str, index: int) -> int: - return index - self._get_first_batch_index(app_name) + return index - self._get_first_batch_index( + app_name, default=self._GLOBAL_FIRST_BATCH_INDEX + ) def _get_batch_index_interval( self, @@ -787,15 +819,19 @@ def _get_batch_index_interval( state: OperationalState | None = None, ) -> portion.Interval: return portion.closed( - self._get_first_batch_index(app_name, state), - self._get_last_batch_index(app_name, state), + self._get_first_batch_index( + app_name, state, default=self._GLOBAL_FIRST_BATCH_INDEX + ), + self._get_last_batch_index( + app_name, state, default=self._BEFORE_GLOBAL_FIRST_BATCH_INDEX + ), ) def _get_first_batch_index( self, app_name: str, state: OperationalState | None = None, - default: int = _GLOBAL_FIRST_BATCH_INDEX, + default: int = _NOT_SET_BATCH_INDEX, ) -> int: if not self._has_any_operational_batch(app_name, state): return default @@ -807,9 +843,16 @@ def _get_last_batch_index( state: OperationalState | None = None, default: int = _NOT_SET_BATCH_INDEX, ) -> int: - if not self._has_any_operational_batch(app_name, state): - return default - return self.get_last_operational_batch(app_name, state).get("index", default) + if state is None: + batches_sequence = self.apps[app_name]["operational_batches_sequence"] + if batches_sequence: + return batches_sequence[-1]["index"] + else: + return default + else: + return self.get_last_operational_batch(app_name, state).get( + "index", default + ) def _has_any_batch( self, @@ -828,7 +871,12 @@ def _has_any_batch( def _has_any_operational_batch( self, app_name: str, state: OperationalState | None = None ) -> bool: - return self._get_last_batch_index(app_name, state, default=0) != 0 + return ( + self._get_last_batch_index( + app_name, state, default=self._NOT_SET_BATCH_INDEX + ) + != self._NOT_SET_BATCH_INDEX + ) @classmethod def _generate_batches_map(cls, batches: Iterable[Batch]) -> dict[str, Batch]: diff --git a/node/tasks.py b/node/tasks.py index 53fa1bd..f0a17f0 100644 --- a/node/tasks.py +++ b/node/tasks.py @@ -173,7 +173,7 @@ def check_censorship( # remove sequenced batches from the missed batches dict missed_batches: list[dict[str, Any]] = [ batch - for batch_hash, batch in zdb.get_missed_batches(app_name).items() + for batch_hash, batch in zdb.get_missed_batches_map(app_name).items() if batch_hash not in sequenced_hashes ] @@ -292,7 +292,7 @@ async def send_dispute_requests() -> None: apps_missed_batches: dict[str, Any] = {} for app_name in list(zconfig.APPS.keys()): - app_missed_batches = zdb.get_missed_batches(app_name) + app_missed_batches = zdb.get_missed_batches_map(app_name) if len(app_missed_batches) > 0: apps_missed_batches[app_name] = app_missed_batches From 76cd7fb8dabe874ac7d34cc4270ec45292e02e26 Mon Sep 17 00:00:00 2001 From: Alireza Roshanzamir Date: Wed, 29 Jan 2025 22:57:14 +0330 Subject: [PATCH 08/13] Fix some indexing issues. --- common/db.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/common/db.py b/common/db.py index c6ac0d1..cc918ae 100644 --- a/common/db.py +++ b/common/db.py @@ -393,7 +393,7 @@ def sequencer_init_batches( last_sequenced_batch = self.apps[app_name]["last_sequenced_batch"] chaining_hash = last_sequenced_batch.get("chaining_hash", "") - index = last_sequenced_batch.get("index", self._GLOBAL_FIRST_BATCH_INDEX) + index = last_sequenced_batch.get("index", self._BEFORE_GLOBAL_FIRST_BATCH_INDEX) for batch in initializing_batches: if self._batch_exists(app_name, batch["hash"]): @@ -406,6 +406,7 @@ def sequencer_init_batches( ) continue + index += 1 chaining_hash = utils.gen_hash(chaining_hash + batch_hash) batch.update( { @@ -420,7 +421,6 @@ def sequencer_init_batches( batch["index"] ) self.apps[app_name]["last_sequenced_batch"] = batch - index += 1 def upsert_sequenced_batches( self, app_name: str, sequenced_batches: list[Batch] @@ -659,7 +659,7 @@ def _resequence_batches( ) -> None: """Resequence batches after a switch in the sequencer.""" index = all_nodes_last_finalized_batch.get( - "index", self._GLOBAL_FIRST_BATCH_INDEX + "index", self._BEFORE_GLOBAL_FIRST_BATCH_INDEX ) chaining_hash = all_nodes_last_finalized_batch.get("chaining_hash", "") resequencing_batches = itertools.chain( @@ -675,6 +675,7 @@ def _resequence_batches( ) resequenced_batches: list[Batch] = [] for resequencing_batch in resequencing_batches: + index += 1 chaining_hash = utils.gen_hash(chaining_hash + resequencing_batch["hash"]) resequenced_batch: Batch = { "app_name": resequencing_batch["app_name"], @@ -690,7 +691,6 @@ def _resequence_batches( self.apps[app_name]["operational_batches_hash_index_map"][ resequenced_batch["hash"] ] = index - index += 1 self.apps[app_name]["initialized_batches_map"] = {} self.apps[app_name]["operational_batches_sequence"] = ( From 8c84ffe84dac77a2ee4a6337bcbe61309710598f Mon Sep 17 00:00:00 2001 From: Alireza Roshanzamir Date: Fri, 31 Jan 2025 22:25:36 +0330 Subject: [PATCH 09/13] Retrieve only the sequenced batches in the get_still_sequenced_batches method. --- common/db.py | 45 ++++++++++++++++++++++++++++++++++----------- 1 file changed, 34 insertions(+), 11 deletions(-) diff --git a/common/db.py b/common/db.py index cc918ae..d1f6feb 100644 --- a/common/db.py +++ b/common/db.py @@ -15,7 +15,6 @@ import itertools import portion # type: ignore[import-untyped] from typing import TypedDict, Iterable -import typeguard # TODO: Remove. State = Literal["initialized", "sequenced", "locked", "finalized"] @@ -62,7 +61,6 @@ class SignatureData(TypedDict, total=False): tag: int -@typeguard.typechecked class InMemoryDB: """A thread-safe singleton in-memory database class to manage batches of transactions and states for apps.""" @@ -133,13 +131,13 @@ def _fetch_apps_and_network_state_periodically(self) -> None: try: self._fetch_apps() except: - zlogger.error("An unexpected error occurred while fetching apps data") + zlogger.error("An unexpected error occurred while fetching apps data.") try: zconfig.fetch_network_state() except: zlogger.error( - "An unexpected error occurred while fetching network state" + "An unexpected error occurred while fetching network state." ) time.sleep(zconfig.FETCH_APPS_AND_NODES_INTERVAL) @@ -269,7 +267,7 @@ def _prune_initialized_and_old_operational_batches( self.apps[app_name]["operational_batches_sequence"] = ( self._filter_operational_batches_sequence( app_name, - self._get_batch_index_interval(app_name, state="finalized").complement() + self._get_batch_index_interval(app_name, "finalized").complement() | portion.open(remove_border, portion.inf), ) ) @@ -349,7 +347,10 @@ def get_still_sequenced_batches(self, app_name: str) -> list[Batch]: return [ batch for batch in self._filter_operational_batches_sequence( - app_name, self._get_batch_index_interval(app_name, "sequenced") + app_name, + self._get_batch_index_interval( + app_name, "sequenced", exclude_next_states=True + ), ) if batch["timestamp"] < border ] @@ -469,7 +470,7 @@ def lock_batches(self, app_name: str, signature_data: SignatureData) -> None: for batch in self._filter_operational_batches_sequence( app_name, - self._get_batch_index_interval(app_name, state="finalized").complement() + self._get_batch_index_interval(app_name, "finalized").complement() & portion.closed( lower=self._GLOBAL_FIRST_BATCH_INDEX, upper=signature_data["index"] ), @@ -617,9 +618,7 @@ def reset_not_finalized_batches_timestamps(self, app_name: str) -> None: self.apps[app_name]["initialized_batches_map"].values(), self._filter_operational_batches_sequence( app_name, - self._get_batch_index_interval( - app_name, state="finalized" - ).complement(), + self._get_batch_index_interval(app_name, "finalized").complement(), ), ) now = int(time.time()) @@ -817,8 +816,9 @@ def _get_batch_index_interval( self, app_name: str, state: OperationalState | None = None, + exclude_next_states: bool = False, ) -> portion.Interval: - return portion.closed( + result = portion.closed( self._get_first_batch_index( app_name, state, default=self._GLOBAL_FIRST_BATCH_INDEX ), @@ -827,6 +827,17 @@ def _get_batch_index_interval( ), ) + if ( + state is not None + and exclude_next_states + and (next_state := self._get_next_operational_state(state)) + ): + result -= self._get_batch_index_interval( + app_name, next_state, exclude_next_states=False + ) + + return result + def _get_first_batch_index( self, app_name: str, @@ -888,5 +899,17 @@ def _generate_batches_hash_index_map( ) -> dict[str, int]: return {batch["hash"]: batch["index"] for batch in batches} + @classmethod + def _get_next_operational_state( + cls, state: OperationalState + ) -> OperationalState | None: + match state: + case "sequenced": + return "locked" + case "locked": + return "finalized" + case "finalized": + return None + zdb: InMemoryDB = InMemoryDB() From c4edfa44bb4ca35fa20ff0eede546b7919477d3d Mon Sep 17 00:00:00 2001 From: Alireza Roshanzamir Date: Sat, 1 Feb 2025 09:38:03 +0330 Subject: [PATCH 10/13] Do not prune initialized batch. --- common/db.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/common/db.py b/common/db.py index d1f6feb..a886717 100644 --- a/common/db.py +++ b/common/db.py @@ -233,7 +233,7 @@ def _save_snapshot_then_prune(self, app_name: str, index: int) -> None: ) try: self._save_finalized_batches(app_name, index, snapshot_border) - self._prune_initialized_and_old_operational_batches(app_name, remove_border) + self._prune_old_finalized_batches(app_name, remove_border) except Exception as error: zlogger.exception( "An error occurred while saving snapshot for %s at index %d: %s", @@ -259,11 +259,8 @@ def _save_finalized_batches( file, ) - def _prune_initialized_and_old_operational_batches( - self, app_name: str, remove_border: int - ) -> None: + def _prune_old_finalized_batches(self, app_name: str, remove_border: int) -> None: """Helper function to prune old batches from memory.""" - self.apps[app_name]["initialized_batches_map"] = {} self.apps[app_name]["operational_batches_sequence"] = ( self._filter_operational_batches_sequence( app_name, From a38f26db3e7b8406993fa95127a2e40afb0ce718 Mon Sep 17 00:00:00 2001 From: Alireza Roshanzamir Date: Sat, 1 Feb 2025 11:25:18 +0330 Subject: [PATCH 11/13] Copy the initialized_batches_map to prevent changes during processing. --- common/db.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/common/db.py b/common/db.py index a886717..3f29ea4 100644 --- a/common/db.py +++ b/common/db.py @@ -275,7 +275,9 @@ def _prune_old_finalized_batches(self, app_name: str, remove_border: int) -> Non ) def get_initialized_batches_map(self, app_name: str) -> dict[str, Batch]: - return self.apps[app_name]["initialized_batches_map"] + # NOTE: We copy the dictionary in order to make it safe to work on it + # without the fear of change in the middle of processing. + return self.apps[app_name]["initialized_batches_map"].copy() def get_global_operational_batches_sequence( self, From fcb573507613e9abdb3c5923c00f3ff091beb56c Mon Sep 17 00:00:00 2001 From: Alireza Roshanzamir Date: Sat, 1 Feb 2025 11:55:54 +0330 Subject: [PATCH 12/13] Remove the debug server setup in the run.py file. --- run.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/run.py b/run.py index 061892f..f5217ca 100644 --- a/run.py +++ b/run.py @@ -101,9 +101,4 @@ def main() -> None: if __name__ == "__main__": - import debugpy - if "1" in sys.argv or "2" in sys.argv: - debugpy.listen(("0.0.0.0", 5678 + int(sys.argv[1]) - 1)) - debugpy.wait_for_client() - debugpy.breakpoint() main() From 4283bec7d05c20e252e6029c5b7830b30fd612ab Mon Sep 17 00:00:00 2001 From: Alireza Roshanzamir Date: Wed, 5 Feb 2025 23:12:11 +0330 Subject: [PATCH 13/13] Apply the PR comments. --- common/db.py | 211 ++++++++++++++++++++------------------------ node/routes.py | 2 +- sequencer/routes.py | 1 - 3 files changed, 98 insertions(+), 116 deletions(-) diff --git a/common/db.py b/common/db.py index 3f29ea4..fb198aa 100644 --- a/common/db.py +++ b/common/db.py @@ -14,7 +14,8 @@ from typing import Literal import itertools import portion # type: ignore[import-untyped] -from typing import TypedDict, Iterable +from typing import TypedDict +from collections.abc import Iterable State = Literal["initialized", "sequenced", "locked", "finalized"] @@ -62,40 +63,27 @@ class SignatureData(TypedDict, total=False): class InMemoryDB: - """A thread-safe singleton in-memory database class to manage batches of transactions and states for apps.""" + """In-memory database class to manage batches of transactions and states for apps.""" _GLOBAL_FIRST_BATCH_INDEX = 1 _BEFORE_GLOBAL_FIRST_BATCH_INDEX = _GLOBAL_FIRST_BATCH_INDEX - 1 _NOT_SET_BATCH_INDEX = _GLOBAL_FIRST_BATCH_INDEX - 1 - # TODO: We only have one instantiation of this class, why we use locking and - # singleton here? - _instance: InMemoryDB | None = None - instance_lock: threading.Lock = threading.Lock() - # TODO: It seems that there are some possible race-condition situations where # multiple data structures are mutated together, or a chain of methods needs # to be called atomically. - def __new__(cls) -> InMemoryDB: - """Singleton pattern implementation to ensure only one instance exists.""" - with cls.instance_lock: - if cls._instance is None: - cls._instance = super().__new__(cls) - cls._instance._initialize() - fetching_thread = Thread( - target=cls._instance._fetch_apps_and_network_state_periodically - ) - fetching_thread.start() - return cls._instance - - def _initialize(self) -> None: + def __init__(self) -> None: """Initialize the InMemoryDB instance.""" self.sequencer_put_batches_lock = threading.Lock() self.pause_node = threading.Event() self._last_saved_index = self._BEFORE_GLOBAL_FIRST_BATCH_INDEX self.is_sequencer_down = False self.apps = self._load_finalized_batches_for_all_apps() + self._fetching_thread = Thread( + target=self._fetch_apps_and_network_state_periodically + ) + self._fetching_thread.start() def _fetch_apps(self) -> None: """Fetchs the apps data.""" @@ -150,14 +138,7 @@ def _load_finalized_batches_for_all_apps(cls) -> dict[str, App]: # TODO: Replace with dot operator. for app_name in getattr(zconfig, "APPS", []): finalized_batches = cls._load_finalized_batches(app_name) - last_finalized_batch: Batch = next( - ( - batch - for batch in reversed(finalized_batches) - if batch.get("finalization_signature") - ), - {}, - ) + last_finalized_batch = finalized_batches[-1] if finalized_batches else {} result[app_name] = { "nodes_state": {}, "initialized_batches_map": {}, @@ -222,50 +203,53 @@ def _load_finalized_batches( ) return [] - def _save_snapshot_then_prune(self, app_name: str, index: int) -> None: - """ - Save a snapshot of the finalized batches to a file and prune old \ - initialized and operational batches. - """ - snapshot_border = index - zconfig.SNAPSHOT_CHUNK - remove_border = max( - index - zconfig.SNAPSHOT_CHUNK * zconfig.REMOVE_CHUNK_BORDER, 0 - ) + def _save_finalized_chunk_then_prune( + self, app_name: str, snapshot_index: int + ) -> None: try: - self._save_finalized_batches(app_name, index, snapshot_border) - self._prune_old_finalized_batches(app_name, remove_border) + snapshot_border_index = max( + snapshot_index - zconfig.SNAPSHOT_CHUNK, + self._BEFORE_GLOBAL_FIRST_BATCH_INDEX, + ) + self._save_finalized_batches_chunk_to_file( + app_name, + border_index=snapshot_border_index, + end_index=snapshot_index, + ) + + remove_border_index = max( + snapshot_index - zconfig.SNAPSHOT_CHUNK * zconfig.REMOVE_CHUNK_BORDER, + self._BEFORE_GLOBAL_FIRST_BATCH_INDEX, + ) + self._prune_old_finalized_batches(app_name, remove_border_index) except Exception as error: zlogger.exception( "An error occurred while saving snapshot for %s at index %d: %s", app_name, - index, + snapshot_index, error, ) - def _save_finalized_batches( - self, app_name: str, index: int, snapshot_border: int + def _save_finalized_batches_chunk_to_file( + self, app_name: str, border_index: int, end_index: int ) -> None: - """Helper function to save batches to a snapshot file.""" snapshot_dir = os.path.join(zconfig.SNAPSHOT_PATH, zconfig.VERSION, app_name) with gzip.open( - snapshot_dir + f"/{str(index).zfill(7)}.json.gz", "wt", encoding="UTF-8" + snapshot_dir + f"/{str(end_index).zfill(7)}.json.gz", "wt", encoding="UTF-8" ) as file: json.dump( self._filter_operational_batches_sequence( app_name, - self._get_batch_index_interval(app_name, "finalized") - & portion.openclosed(snapshot_border, index), + portion.openclosed(border_index, end_index), ), file, ) - def _prune_old_finalized_batches(self, app_name: str, remove_border: int) -> None: - """Helper function to prune old batches from memory.""" + def _prune_old_finalized_batches(self, app_name: str, border_index: int) -> None: self.apps[app_name]["operational_batches_sequence"] = ( self._filter_operational_batches_sequence( app_name, - self._get_batch_index_interval(app_name, "finalized").complement() - | portion.open(remove_border, portion.inf), + portion.open(border_index, portion.inf), ) ) self.apps[app_name]["operational_batches_hash_index_map"] = ( @@ -282,49 +266,54 @@ def get_initialized_batches_map(self, app_name: str) -> dict[str, Batch]: def get_global_operational_batches_sequence( self, app_name: str, - states: set[OperationalState], + state: OperationalState | None = None, after: int = _BEFORE_GLOBAL_FIRST_BATCH_INDEX, ) -> list[Batch]: """Get batches filtered by state and optionally by index.""" batches_sequence: list[Batch] = [] batches_hash_set: set[str] = set() # TODO: Check if this is necessary. - last_finalized_index = self.apps[app_name]["last_finalized_batch"].get( - "index", self._BEFORE_GLOBAL_FIRST_BATCH_INDEX - ) - current_chunk = math.ceil((after + 1) / zconfig.SNAPSHOT_CHUNK) - next_chunk = math.ceil( - (after + 1 + zconfig.API_BATCHES_LIMIT) / zconfig.SNAPSHOT_CHUNK - ) - finalized_chunk = math.ceil(last_finalized_index / zconfig.SNAPSHOT_CHUNK) - if current_chunk != finalized_chunk: - loaded_finalized_batches = self._load_finalized_batches(app_name, after + 1) - self._filter_then_add_the_finalized_batches( - loaded_finalized_batches, - states, - after, - batches_sequence, - batches_hash_set, - ) + includes_finalized_batches = state is None or state == "finalized" - if len(batches_sequence) < zconfig.API_BATCHES_LIMIT and next_chunk not in [ - current_chunk, - finalized_chunk, - ]: - loaded_finalized_batches = self._load_finalized_batches( - app_name, after + 1 + len(batches_sequence) + if includes_finalized_batches: + last_finalized_index = self.apps[app_name]["last_finalized_batch"].get( + "index", self._BEFORE_GLOBAL_FIRST_BATCH_INDEX ) - self._filter_then_add_the_finalized_batches( - loaded_finalized_batches, - states, - after, - batches_sequence, - batches_hash_set, + current_chunk = math.ceil((after + 1) / zconfig.SNAPSHOT_CHUNK) + next_chunk = math.ceil( + (after + 1 + zconfig.API_BATCHES_LIMIT) / zconfig.SNAPSHOT_CHUNK ) + finalized_chunk = math.ceil(last_finalized_index / zconfig.SNAPSHOT_CHUNK) - self._filter_then_add_the_finalized_batches( - self.apps[app_name]["operational_batches_sequence"], - states, + if current_chunk != finalized_chunk: + loaded_finalized_batches = self._load_finalized_batches( + app_name, after + 1 + ) + self._append_unique_batches_after_index( + loaded_finalized_batches, + after, + batches_sequence, + batches_hash_set, + ) + + if len(batches_sequence) < zconfig.API_BATCHES_LIMIT and next_chunk not in [ + current_chunk, + finalized_chunk, + ]: + loaded_finalized_batches = self._load_finalized_batches( + app_name, after + 1 + len(batches_sequence) + ) + self._append_unique_batches_after_index( + loaded_finalized_batches, + after, + batches_sequence, + batches_hash_set, + ) + + self._append_unique_batches_after_index( + self._filter_operational_batches_sequence( + app_name, self._get_batch_index_interval(app_name, state) + ), after, batches_sequence, batches_hash_set, @@ -465,6 +454,10 @@ def lock_batches(self, app_name: str, signature_data: SignatureData) -> None: signature_data["hash"] not in self.apps[app_name]["operational_batches_hash_index_map"] ): + zlogger.warning( + f"The locking {signature_data=} hash couldn't be found in the " + "operational batches." + ) return for batch in self._filter_operational_batches_sequence( @@ -499,6 +492,10 @@ def finalize_batches(self, app_name: str, signature_data: SignatureData) -> None signature_data["hash"] not in self.apps[app_name]["operational_batches_hash_index_map"] ): + zlogger.warning( + f"The finalizing {signature_data=} hash couldn't be found in the " + "operational batches." + ) return snapshot_indexes: list[int] = [] @@ -524,7 +521,7 @@ def finalize_batches(self, app_name: str, signature_data: SignatureData) -> None if snapshot_index <= self._last_saved_index: continue self._last_saved_index = snapshot_index - self._save_snapshot_then_prune(app_name, snapshot_index) + self._save_finalized_chunk_then_prune(app_name, snapshot_index) def upsert_node_state( self, @@ -739,33 +736,33 @@ def _reinitialize_batches( self.apps[app_name]["last_locked_batch"] = all_nodes_last_finalized_batch self.apps[app_name]["last_finalized_batch"] = all_nodes_last_finalized_batch - def _filter_then_add_the_finalized_batches( + # TODO: This function should be removed as it contains many duplicate + # implementations with other BatchesSequence methods in this class. + def _append_unique_batches_after_index( self, - loaded_finalized_batches: list[Batch], - states: set[OperationalState], + source_batches_sequence: list[Batch], after: int, # TODO: In Python, it's very unusual to use parameters as the output of # a function. - batches_sequence: list[Batch], - batches_hash_set: set[str], + target_batches_sequence: list[Batch], + target_batches_hash_set: set[str], ) -> None: - """Filter and add batches to the result based on state and index.""" - if not loaded_finalized_batches or "finalized" not in states: + """Filter and add batches to the result based on index.""" + if not source_batches_sequence: return - # TODO: Remove relative index calculation duplication. - first_loaded_batch_index = loaded_finalized_batches[0]["index"] - relative_after = after - first_loaded_batch_index + first_batch_index = source_batches_sequence[0]["index"] + relative_after = after - first_batch_index - for batch in loaded_finalized_batches[relative_after + 1 :]: - if len(batches_sequence) >= zconfig.API_BATCHES_LIMIT: + for batch in source_batches_sequence[relative_after + 1 :]: + if len(target_batches_sequence) >= zconfig.API_BATCHES_LIMIT: break - if batch["hash"] in batches_hash_set: + if batch["hash"] in target_batches_hash_set: continue - batches_sequence.append(batch) - batches_hash_set.add(batch["hash"]) + target_batches_sequence.append(batch) + target_batches_hash_set.add(batch["hash"]) def _filter_operational_batches_sequence( self, @@ -864,20 +861,6 @@ def _get_last_batch_index( "index", default ) - def _has_any_batch( - self, - app_name: str, - state: State | None = None, - ) -> bool: - if state is None: - return bool( - self.apps[app_name]["initialized_batches_map"] - ) or self._has_any_operational_batch(app_name, state) - elif state == "initialized": - return bool(self.apps[app_name]["initialized_batches_map"]) - else: - return self._has_any_operational_batch(app_name, state) - def _has_any_operational_batch( self, app_name: str, state: OperationalState | None = None ) -> bool: @@ -911,4 +894,4 @@ def _get_next_operational_state( return None -zdb: InMemoryDB = InMemoryDB() +zdb = InMemoryDB() diff --git a/node/routes.py b/node/routes.py index f2c9be2..de2aafd 100644 --- a/node/routes.py +++ b/node/routes.py @@ -151,7 +151,7 @@ def get_batches(app_name: str, state: str) -> Response: if after < 0: return error_response(ErrorCodes.INVALID_REQUEST, "Invalid after param.") - batches_sequence = zdb.get_global_operational_batches_sequence(app_name, { state }, after) + batches_sequence = zdb.get_global_operational_batches_sequence(app_name, state, after) if not batches_sequence: return success_response(data=None) diff --git a/sequencer/routes.py b/sequencer/routes.py index 9691eb0..48e0953 100644 --- a/sequencer/routes.py +++ b/sequencer/routes.py @@ -62,7 +62,6 @@ def _put_batches(req_data: dict[str, Any]) -> dict[str, Any]: batches_sequence = zdb.get_global_operational_batches_sequence( app_name=req_data["app_name"], - states={"sequenced", "locked", "finalized"}, after=req_data["sequenced_index"], ) last_finalized_batch: dict[str, Any] = zdb.get_last_operational_batch(