From 24bf5a688ad82c4d51986b83a2637ac95d9636bd Mon Sep 17 00:00:00 2001 From: "m.chitgarha78" Date: Mon, 29 Jul 2024 15:08:23 +0330 Subject: [PATCH 1/7] Add wait for gathering threshold signatures instead of all signatures --- common/bls.py | 38 ++++++++++++++++++++++++++++++++++---- 1 file changed, 34 insertions(+), 4 deletions(-) diff --git a/common/bls.py b/common/bls.py index ca49316..cd16daa 100644 --- a/common/bls.py +++ b/common/bls.py @@ -54,8 +54,8 @@ async def gather_and_aggregate_signatures( message: str = utils.gen_hash(json.dumps(data, sort_keys=True)) - tasks: list[asyncio.Task] = [ - asyncio.create_task( + tasks = { + node_id: asyncio.create_task( request_signature( node_id=node_id, url=f'http://{zconfig.NODES[node_id]["host"]}:{zconfig.NODES[node_id]["port"]}/node/sign_sync_point', @@ -65,8 +65,38 @@ async def gather_and_aggregate_signatures( ) ) for node_id in node_ids - ] - signatures: list[dict[str, Any] | None] = await asyncio.gather(*tasks) + } + + aggregate_timeout = 10 # in seconds + completed_results = [] + pending_tasks = list(tasks.values()) + stake = 0 + start_time = asyncio.get_event_loop().time() + + try: + while 100 * stake / zconfig.TOTAL_STAKE < zconfig.THRESHOLD_PERCENT: + elapsed_time = asyncio.get_event_loop().time() - start_time + remaining_time = aggregate_timeout - elapsed_time + + if remaining_time <= 0: + raise asyncio.TimeoutError + + done, pending = await asyncio.wait(pending_tasks, timeout=remaining_time, return_when=asyncio.FIRST_COMPLETED) + for task in done: + for node_id, node_task in tasks.items(): + if node_task == task: + completed_results.append(task.result()) + stake += zconfig.NODES[node_id]['stake'] + break + if 100 * stake / zconfig.TOTAL_STAKE >= zconfig.THRESHOLD_PERCENT: + for task in pending: + task.cancel() + break + pending_tasks = pending + except asyncio.TimeoutError: + return None + + signatures: list[dict[str, Any] | None] = completed_results signatures_dict: dict[str, dict[str, Any] | None] = dict(zip(node_ids, signatures)) nonsigners = [k for k, v in signatures_dict.items() if v is None] nonsigners += list(set(zconfig.NODES.keys()) - node_ids - set(zconfig.NODE["id"])) From c4b6c2088997246e75916014a9b12b18b28c0263 Mon Sep 17 00:00:00 2001 From: "m.chitgarha78" Date: Tue, 30 Jul 2024 09:51:13 +0330 Subject: [PATCH 2/7] Fix issues regarding #16 issue and simplify the mechanism. --- common/bls.py | 58 ++++++++++++++++++++-------------------------- config.py | 4 ++++ examples/runner.py | 6 +++-- 3 files changed, 33 insertions(+), 35 deletions(-) diff --git a/common/bls.py b/common/bls.py index cd16daa..d292408 100644 --- a/common/bls.py +++ b/common/bls.py @@ -40,7 +40,27 @@ def is_bls_sig_verified( signature.setStr(signature_hex.encode("utf-8")) return signature.verify(public_key, message.encode("utf-8")) - +async def gather_signatures( + sign_tasks: dict[asyncio.Task, str] +) -> dict[str, Any] | None: + """Gather signatures from nodes until the stake of nodes reaches the threshold""" + completed_results = [] + pending_tasks = list(sign_tasks.keys()) + stake_percent = 0 + try: + while stake_percent < zconfig.THRESHOLD_PERCENT: + done, pending = await asyncio.wait(pending_tasks, return_when=asyncio.FIRST_COMPLETED) + for task in done: + node_id = sign_tasks[task] + completed_results.append(task.result()) + stake_percent += 100 * zconfig.NODES[node_id]['stake'] / zconfig.TOTAL_STAKE + break + pending_tasks = pending + return completed_results + except Exception as error: + zlogger.exception(f"An unexpected error occurred: {error}") + return completed_results + async def gather_and_aggregate_signatures( data: dict[str, Any], node_ids: set[str] ) -> dict[str, Any] | None: @@ -54,8 +74,8 @@ async def gather_and_aggregate_signatures( message: str = utils.gen_hash(json.dumps(data, sort_keys=True)) - tasks = { - node_id: asyncio.create_task( + sign_tasks: dict[asyncio.Task, str] = { + asyncio.create_task( request_signature( node_id=node_id, url=f'http://{zconfig.NODES[node_id]["host"]}:{zconfig.NODES[node_id]["port"]}/node/sign_sync_point', @@ -63,38 +83,10 @@ async def gather_and_aggregate_signatures( message=message, timeout=120, ) - ) + ) : node_id for node_id in node_ids } - - aggregate_timeout = 10 # in seconds - completed_results = [] - pending_tasks = list(tasks.values()) - stake = 0 - start_time = asyncio.get_event_loop().time() - - try: - while 100 * stake / zconfig.TOTAL_STAKE < zconfig.THRESHOLD_PERCENT: - elapsed_time = asyncio.get_event_loop().time() - start_time - remaining_time = aggregate_timeout - elapsed_time - - if remaining_time <= 0: - raise asyncio.TimeoutError - - done, pending = await asyncio.wait(pending_tasks, timeout=remaining_time, return_when=asyncio.FIRST_COMPLETED) - for task in done: - for node_id, node_task in tasks.items(): - if node_task == task: - completed_results.append(task.result()) - stake += zconfig.NODES[node_id]['stake'] - break - if 100 * stake / zconfig.TOTAL_STAKE >= zconfig.THRESHOLD_PERCENT: - for task in pending: - task.cancel() - break - pending_tasks = pending - except asyncio.TimeoutError: - return None + completed_results = await asyncio.wait_for(gather_signatures(sign_tasks), timeout=zconfig.AGGREGATION_TIMEOUT) signatures: list[dict[str, Any] | None] = completed_results signatures_dict: dict[str, dict[str, Any] | None] = dict(zip(node_ids, signatures)) diff --git a/config.py b/config.py index 550226e..2278307 100644 --- a/config.py +++ b/config.py @@ -51,6 +51,7 @@ def validate_env_variables() -> None: "ZSEQUENCER_SEND_TXS_INTERVAL", "ZSEQUENCER_SYNC_INTERVAL", "ZSEQUENCER_FINALIZATION_TIME_BORDER", + "ZSEQUENCER_SIGNATURES_AGGREGATION_TIMEOUT" ] missing_vars: list[str] = [var for var in required_vars if not os.getenv(var)] @@ -91,6 +92,9 @@ def load_environment_variables(self): self.FINALIZATION_TIME_BORDER: int = int( os.getenv("ZSEQUENCER_FINALIZATION_TIME_BORDER", "120") ) + self.AGGREGATION_TIMEOUT: int = int( + os.getenv("ZSEQUENCER_SIGNATURES_AGGREGATION_TIMEOUT", "120") + ) self.NODES_FILE: str = os.getenv("ZSEQUENCER_NODES_FILE", "nodes.json") self.NODES: dict[str, dict[str, Any]] = self.load_json_file(self.NODES_FILE) for node in self.NODES.values(): diff --git a/examples/runner.py b/examples/runner.py index b49c154..6b3ef94 100644 --- a/examples/runner.py +++ b/examples/runner.py @@ -23,6 +23,7 @@ ZSEQUENCER_SEND_TXS_INTERVAL: float = 0.01 ZSEQUENCER_SYNC_INTERVAL: float = 0.01 ZSEQUENCER_FINALIZATION_TIME_BORDER: int = 120 +ZSEQUENCER_SIGNATURES_AGGREGATION_TIMEOUT = 10 APP_NAME: str = "simple_app" @@ -118,8 +119,9 @@ def main() -> None: "ZSEQUENCER_SEND_TXS_INTERVAL": str(ZSEQUENCER_SEND_TXS_INTERVAL), "ZSEQUENCER_SYNC_INTERVAL": str(ZSEQUENCER_SYNC_INTERVAL), "ZSEQUENCER_FINALIZATION_TIME_BORDER": str( - ZSEQUENCER_FINALIZATION_TIME_BORDER - ), + ZSEQUENCER_FINALIZATION_TIME_BORDER), + "ZSEQUENCER_SIGNATURES_AGGREGATION_TIMEOUT": str( + ZSEQUENCER_SIGNATURES_AGGREGATION_TIMEOUT), "ZSEQUENCER_ENV_PATH": f"{DST_DIR}/node{i + 1}", } From 93f2b358117a9d42db3f21d613ad5463821a510b Mon Sep 17 00:00:00 2001 From: "m.chitgarha78" Date: Tue, 30 Jul 2024 10:21:08 +0330 Subject: [PATCH 3/7] Fix #16 issues --- config.py | 2 +- examples/runner.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/config.py b/config.py index 2278307..c05f9a3 100644 --- a/config.py +++ b/config.py @@ -93,7 +93,7 @@ def load_environment_variables(self): os.getenv("ZSEQUENCER_FINALIZATION_TIME_BORDER", "120") ) self.AGGREGATION_TIMEOUT: int = int( - os.getenv("ZSEQUENCER_SIGNATURES_AGGREGATION_TIMEOUT", "120") + os.getenv("ZSEQUENCER_SIGNATURES_AGGREGATION_TIMEOUT", "5") ) self.NODES_FILE: str = os.getenv("ZSEQUENCER_NODES_FILE", "nodes.json") self.NODES: dict[str, dict[str, Any]] = self.load_json_file(self.NODES_FILE) diff --git a/examples/runner.py b/examples/runner.py index 6b3ef94..e463e6e 100644 --- a/examples/runner.py +++ b/examples/runner.py @@ -23,7 +23,7 @@ ZSEQUENCER_SEND_TXS_INTERVAL: float = 0.01 ZSEQUENCER_SYNC_INTERVAL: float = 0.01 ZSEQUENCER_FINALIZATION_TIME_BORDER: int = 120 -ZSEQUENCER_SIGNATURES_AGGREGATION_TIMEOUT = 10 +ZSEQUENCER_SIGNATURES_AGGREGATION_TIMEOUT = 5 APP_NAME: str = "simple_app" From 19ccb2e09610120685bff0309014b1c32a532290 Mon Sep 17 00:00:00 2001 From: "m.chitgarha78" Date: Tue, 30 Jul 2024 10:30:00 +0330 Subject: [PATCH 4/7] Remove break when iterating on done tasks. --- common/bls.py | 1 - 1 file changed, 1 deletion(-) diff --git a/common/bls.py b/common/bls.py index d292408..899ccbd 100644 --- a/common/bls.py +++ b/common/bls.py @@ -54,7 +54,6 @@ async def gather_signatures( node_id = sign_tasks[task] completed_results.append(task.result()) stake_percent += 100 * zconfig.NODES[node_id]['stake'] / zconfig.TOTAL_STAKE - break pending_tasks = pending return completed_results except Exception as error: From 34a661520fa851003cdaeb5886bcddd044daf10f Mon Sep 17 00:00:00 2001 From: "m.chitgarha78" Date: Tue, 30 Jul 2024 10:34:35 +0330 Subject: [PATCH 5/7] handle timeout asyncio.wait_for function. --- common/bls.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/common/bls.py b/common/bls.py index 899ccbd..7a66f57 100644 --- a/common/bls.py +++ b/common/bls.py @@ -85,7 +85,10 @@ async def gather_and_aggregate_signatures( ) : node_id for node_id in node_ids } - completed_results = await asyncio.wait_for(gather_signatures(sign_tasks), timeout=zconfig.AGGREGATION_TIMEOUT) + try: + completed_results = await asyncio.wait_for(gather_signatures(sign_tasks), timeout=zconfig.AGGREGATION_TIMEOUT) + except asyncio.TimeoutError: + return None signatures: list[dict[str, Any] | None] = completed_results signatures_dict: dict[str, dict[str, Any] | None] = dict(zip(node_ids, signatures)) From 9ceecf0c21e661345c452482497b6f43e6371368 Mon Sep 17 00:00:00 2001 From: "m.chitgarha78" Date: Tue, 30 Jul 2024 10:53:22 +0330 Subject: [PATCH 6/7] Add zlogger exception loggin for signature aggregation. --- common/bls.py | 1 + 1 file changed, 1 insertion(+) diff --git a/common/bls.py b/common/bls.py index 7a66f57..6004a58 100644 --- a/common/bls.py +++ b/common/bls.py @@ -88,6 +88,7 @@ async def gather_and_aggregate_signatures( try: completed_results = await asyncio.wait_for(gather_signatures(sign_tasks), timeout=zconfig.AGGREGATION_TIMEOUT) except asyncio.TimeoutError: + zlogger.exception(f"Aggregation of signatures timed out after {zconfig.AGGREGATION_TIMEOUT} seconds.") return None signatures: list[dict[str, Any] | None] = completed_results From 0c817014b144755da6c9e519aa58221ef5af2dde Mon Sep 17 00:00:00 2001 From: abramsymons Date: Tue, 30 Jul 2024 17:15:54 +0330 Subject: [PATCH 7/7] resolves #11; check nonsigners stake on verification --- node/tasks.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/node/tasks.py b/node/tasks.py index 3e29821..96bc8b9 100644 --- a/node/tasks.py +++ b/node/tasks.py @@ -175,6 +175,11 @@ def is_sync_point_signature_verified( nonsigners: list[str], ) -> bool: """Verify the BLS signature of a synchronization point.""" + nonsigners_stake = sum([zconfig.NODES[node_id]['stake'] for node_id in nonsigners]) + if 100 * nonsigners_stake / zconfig.TOTAL_STAKE > 100 - zconfig.THRESHOLD_PERCENT: + zlogger.exception("Invalid signature from sequencer") + return False + public_key: attestation.G2Point = bls.get_signers_aggregated_public_key(nonsigners) message: str = utils.gen_hash( json.dumps(