From 9ea01b78d86d32915d9c17b655a79b4ffe949072 Mon Sep 17 00:00:00 2001 From: Vlad Stan Date: Thu, 26 Oct 2023 19:46:10 +0300 Subject: [PATCH 01/30] refactor: clean-up --- __init__.py | 2 + crud.py | 8 +- migrations.py | 2 +- models.py | 9 +- nostr/bech32.py | 32 +++-- nostr/client/client.py | 9 +- nostr/delegation.py | 8 +- nostr/key.py | 2 +- nostr/message_pool.py | 14 +- nostr/relay.py | 160 ++++----------------- nostr/relay_manager.py | 43 +++--- nostr/subscription.py | 7 +- router.py | 93 ++++++------ tasks.py | 32 +++-- templates/nostrclient/index.html | 238 +++++++++++++++++++++++-------- views_api.py | 17 ++- 16 files changed, 363 insertions(+), 313 deletions(-) diff --git a/__init__.py b/__init__.py index 7f573e7..84750dc 100644 --- a/__init__.py +++ b/__init__.py @@ -22,6 +22,8 @@ scheduled_tasks: List[asyncio.Task] = [] + +# remove! class NostrClient: def __init__(self): self.client: NostrClientLib = NostrClientLib(connect=False) diff --git a/crud.py b/crud.py index 780642d..c064a9a 100644 --- a/crud.py +++ b/crud.py @@ -1,9 +1,3 @@ -from typing import List, Optional, Union - -import shortuuid - -from lnbits.helpers import urlsafe_short_hash - from . import db from .models import Relay, RelayList @@ -15,7 +9,7 @@ async def get_relays() -> RelayList: async def add_relay(relay: Relay) -> None: await db.execute( - f""" + """ INSERT INTO nostrclient.relays ( id, url, diff --git a/migrations.py b/migrations.py index 5a30e45..73b9ed8 100644 --- a/migrations.py +++ b/migrations.py @@ -3,7 +3,7 @@ async def m001_initial(db): Initial nostrclient table. """ await db.execute( - f""" + """ CREATE TABLE nostrclient.relays ( id TEXT NOT NULL PRIMARY KEY, url TEXT NOT NULL, diff --git a/models.py b/models.py index 88651fc..24730bc 100644 --- a/models.py +++ b/models.py @@ -1,8 +1,5 @@ -from dataclasses import dataclass -from typing import Dict, List, Optional +from typing import List, Optional -from fastapi import Request -from fastapi.param_functions import Query from pydantic import BaseModel, Field from lnbits.helpers import urlsafe_short_hash @@ -14,7 +11,8 @@ class RelayStatus(BaseModel): error_counter: Optional[int] = 0 error_list: Optional[List] = [] notice_list: Optional[List] = [] - + + class Relay(BaseModel): id: Optional[str] = None url: Optional[str] = None @@ -62,6 +60,7 @@ class TestMessage(BaseModel): reciever_public_key: str message: str + class TestMessageResponse(BaseModel): private_key: str public_key: str diff --git a/nostr/bech32.py b/nostr/bech32.py index 61a92c4..0ae6c80 100644 --- a/nostr/bech32.py +++ b/nostr/bech32.py @@ -26,19 +26,22 @@ class Encoding(Enum): """Enumeration type to list the various supported encodings.""" + BECH32 = 1 BECH32M = 2 + CHARSET = "qpzry9x8gf2tvdw0s3jn54khce6mua7l" -BECH32M_CONST = 0x2bc830a3 +BECH32M_CONST = 0x2BC830A3 + def bech32_polymod(values): """Internal function that computes the Bech32 checksum.""" - generator = [0x3b6a57b2, 0x26508e6d, 0x1ea119fa, 0x3d4233dd, 0x2a1462b3] + generator = [0x3B6A57B2, 0x26508E6D, 0x1EA119FA, 0x3D4233DD, 0x2A1462B3] chk = 1 for value in values: top = chk >> 25 - chk = (chk & 0x1ffffff) << 5 ^ value + chk = (chk & 0x1FFFFFF) << 5 ^ value for i in range(5): chk ^= generator[i] if ((top >> i) & 1) else 0 return chk @@ -58,6 +61,7 @@ def bech32_verify_checksum(hrp, data): return Encoding.BECH32M return None + def bech32_create_checksum(hrp, data, spec): """Compute the checksum values given HRP and data.""" values = bech32_hrp_expand(hrp) + data @@ -69,26 +73,29 @@ def bech32_create_checksum(hrp, data, spec): def bech32_encode(hrp, data, spec): """Compute a Bech32 string given HRP and data values.""" combined = data + bech32_create_checksum(hrp, data, spec) - return hrp + '1' + ''.join([CHARSET[d] for d in combined]) + return hrp + "1" + "".join([CHARSET[d] for d in combined]) + def bech32_decode(bech): """Validate a Bech32/Bech32m string, and determine HRP and data.""" - if ((any(ord(x) < 33 or ord(x) > 126 for x in bech)) or - (bech.lower() != bech and bech.upper() != bech)): + if (any(ord(x) < 33 or ord(x) > 126 for x in bech)) or ( + bech.lower() != bech and bech.upper() != bech + ): return (None, None, None) bech = bech.lower() - pos = bech.rfind('1') + pos = bech.rfind("1") if pos < 1 or pos + 7 > len(bech) or len(bech) > 90: return (None, None, None) - if not all(x in CHARSET for x in bech[pos+1:]): + if not all(x in CHARSET for x in bech[pos + 1 :]): return (None, None, None) hrp = bech[:pos] - data = [CHARSET.find(x) for x in bech[pos+1:]] + data = [CHARSET.find(x) for x in bech[pos + 1 :]] spec = bech32_verify_checksum(hrp, data) if spec is None: return (None, None, None) return (hrp, data[:-6], spec) + def convertbits(data, frombits, tobits, pad=True): """General power-of-2 base conversion.""" acc = 0 @@ -124,7 +131,12 @@ def decode(hrp, addr): return (None, None) if data[0] == 0 and len(decoded) != 20 and len(decoded) != 32: return (None, None) - if data[0] == 0 and spec != Encoding.BECH32 or data[0] != 0 and spec != Encoding.BECH32M: + if ( + data[0] == 0 + and spec != Encoding.BECH32 + or data[0] != 0 + and spec != Encoding.BECH32M + ): return (None, None) return (data[0], decoded) diff --git a/nostr/client/client.py b/nostr/client/client.py index db07a06..40231a1 100644 --- a/nostr/client/client.py +++ b/nostr/client/client.py @@ -1,11 +1,13 @@ import asyncio from typing import List +from loguru import logger + from ..relay_manager import RelayManager class NostrClient: - relays = [ ] + relays = [] relay_manager = RelayManager() def __init__(self, relays: List[str] = [], connect=True): @@ -16,7 +18,10 @@ def __init__(self, relays: List[str] = [], connect=True): async def connect(self): for relay in self.relays: - self.relay_manager.add_relay(relay) + try: + self.relay_manager.add_relay(relay) + except Exception as e: + logger.debug(e) def close(self): self.relay_manager.close_connections() diff --git a/nostr/delegation.py b/nostr/delegation.py index 94801f5..8b1c311 100644 --- a/nostr/delegation.py +++ b/nostr/delegation.py @@ -7,23 +7,23 @@ class Delegation: delegator_pubkey: str delegatee_pubkey: str event_kind: int - duration_secs: int = 30*24*60 # default to 30 days + duration_secs: int = 30 * 24 * 60 # default to 30 days signature: str = None # set in PrivateKey.sign_delegation @property def expires(self) -> int: return int(time.time()) + self.duration_secs - + @property def conditions(self) -> str: return f"kind={self.event_kind}&created_at<{self.expires}" - + @property def delegation_token(self) -> str: return f"nostr:delegation:{self.delegatee_pubkey}:{self.conditions}" def get_tag(self) -> list[str]: - """ Called by Event """ + """Called by Event""" return [ "delegation", self.delegator_pubkey, diff --git a/nostr/key.py b/nostr/key.py index 8089e11..3c81e94 100644 --- a/nostr/key.py +++ b/nostr/key.py @@ -37,7 +37,7 @@ def from_npub(cls, npub: str): class PrivateKey: def __init__(self, raw_secret: bytes = None) -> None: - if not raw_secret is None: + if raw_secret is not None: self.raw_secret = raw_secret else: self.raw_secret = secrets.token_bytes(32) diff --git a/nostr/message_pool.py b/nostr/message_pool.py index 02f7fd4..e38e66e 100644 --- a/nostr/message_pool.py +++ b/nostr/message_pool.py @@ -69,7 +69,7 @@ def _process_message(self, message: str, url: str): e["sig"], ) with self.lock: - if not f"{subscription_id}_{event.id}" in self._unique_events: + if f"{subscription_id}_{event.id}" not in self._unique_events: self._accept_event(EventMessage(event, subscription_id, url)) elif message_type == RelayMessageType.NOTICE: self.notices.put(NoticeMessage(message_json[1], url)) @@ -78,10 +78,12 @@ def _process_message(self, message: str, url: str): def _accept_event(self, event_message: EventMessage): """ - Event uniqueness is considered per `subscription_id`. - The `subscription_id` is rewritten to be unique and it is the same accross relays. - The same event can come from different subscriptions (from the same client or from different ones). - Clients that have joined later should receive older events. + Event uniqueness is considered per `subscription_id`. + The `subscription_id` is rewritten to be unique and it is the same accross relays. + The same event can come from different subscriptions (from the same client or from different ones). + Clients that have joined later should receive older events. """ self.events.put(event_message) - self._unique_events.add(f"{event_message.subscription_id}_{event_message.event.id}") \ No newline at end of file + self._unique_events.add( + f"{event_message.subscription_id}_{event_message.event.id}" + ) diff --git a/nostr/relay.py b/nostr/relay.py index caacba0..0630ae5 100644 --- a/nostr/relay.py +++ b/nostr/relay.py @@ -2,43 +2,23 @@ import json import time from queue import Queue -from threading import Lock from typing import List from loguru import logger from websocket import WebSocketApp -from .event import Event -from .filter import Filters from .message_pool import MessagePool -from .message_type import RelayMessageType from .subscription import Subscription -class RelayPolicy: - def __init__(self, should_read: bool = True, should_write: bool = True) -> None: - self.should_read = should_read - self.should_write = should_write - - def to_json_object(self) -> dict[str, bool]: - return {"read": self.should_read, "write": self.should_write} - - class Relay: - def __init__( - self, - url: str, - policy: RelayPolicy, - message_pool: MessagePool, - subscriptions: dict[str, Subscription] = {}, - ) -> None: + def __init__(self, url: str, message_pool: MessagePool) -> None: self.url = url - self.policy = policy self.message_pool = message_pool - self.subscriptions = subscriptions self.connected: bool = False self.reconnect: bool = True self.shutdown: bool = False + # todo: extract stats self.error_counter: int = 0 self.error_threshold: int = 100 self.error_list: List[str] = [] @@ -47,12 +27,10 @@ def __init__( self.num_received_events: int = 0 self.num_sent_events: int = 0 self.num_subscriptions: int = 0 - self.ssl_options: dict = {} - self.proxy: dict = {} - self.lock = Lock() + self.queue = Queue() - def connect(self, ssl_options: dict = None, proxy: dict = None): + def connect(self): self.ws = WebSocketApp( self.url, on_open=self._on_open, @@ -62,19 +40,14 @@ def connect(self, ssl_options: dict = None, proxy: dict = None): on_ping=self._on_ping, on_pong=self._on_pong, ) - self.ssl_options = ssl_options - self.proxy = proxy if not self.connected: - self.ws.run_forever( - sslopt=ssl_options, - http_proxy_host=None if proxy is None else proxy.get("host"), - http_proxy_port=None if proxy is None else proxy.get("port"), - proxy_type=None if proxy is None else proxy.get("type"), - ping_interval=5, - ) + self.ws.run_forever(ping_interval=10) def close(self): - self.ws.close() + try: + self.ws.close() + except Exception as e: + logger.warning(f"[Relay: {self.url}] Failed to close websocket: {e}") self.connected = False self.shutdown = True @@ -90,10 +63,10 @@ def ping(self): def publish(self, message: str): self.queue.put(message) - def publish_subscriptions(self): - for _, subscription in self.subscriptions.items(): + def publish_subscriptions(self, subscriptions: List[Subscription] = []): + for subscription in subscriptions: s = subscription.to_json_object() - json_str = json.dumps(["REQ", s["id"], s["filters"][0]]) + json_str = json.dumps(["REQ", s["id"], s["filters"]]) self.publish(json_str) async def queue_worker(self): @@ -107,51 +80,36 @@ async def queue_worker(self): pass else: await asyncio.sleep(1) - - if self.shutdown: - logger.warning(f"Closing queue worker for '{self.url}'.") - break - def add_subscription(self, id, filters: Filters): - with self.lock: - self.subscriptions[id] = Subscription(id, filters) + if self.shutdown: + logger.warning(f"[Relay: {self.url}] Closing queue worker.") + return def close_subscription(self, id: str) -> None: - with self.lock: - self.subscriptions.pop(id) - self.publish(json.dumps(["CLOSE", id])) - - def to_json_object(self) -> dict: - return { - "url": self.url, - "policy": self.policy.to_json_object(), - "subscriptions": [ - subscription.to_json_object() - for subscription in self.subscriptions.values() - ], - } + self.publish(json.dumps(["CLOSE", id])) def add_notice(self, notice: str): - self.notice_list = ([notice] + self.notice_list)[:20] + self.notice_list = [notice] + self.notice_list def _on_open(self, _): - logger.info(f"Connected to relay: '{self.url}'.") + logger.info(f"[Relay: {self.url}] Connected.") self.connected = True - + self.shutdown = False + def _on_close(self, _, status_code, message): - logger.warning(f"Connection to relay {self.url} closed. Status: '{status_code}'. Message: '{message}'.") + logger.warning( + f"[Relay: {self.url}] Connection closed. Status: '{status_code}'. Message: '{message}'." + ) self.close() def _on_message(self, _, message: str): - if self._is_valid_message(message): - self.num_received_events += 1 - self.message_pool.add_message(message, self.url) + self.num_received_events += 1 + self.message_pool.add_message(message, self.url) def _on_error(self, _, error): - logger.warning(f"Relay error: '{str(error)}'") + logger.warning(f"[Relay: {self.url}] Error: '{str(error)}'") self._append_error_message(str(error)) - self.connected = False - self.error_counter += 1 + self.close() def _on_ping(self, *_): return @@ -159,65 +117,7 @@ def _on_ping(self, *_): def _on_pong(self, *_): return - def _is_valid_message(self, message: str) -> bool: - message = message.strip("\n") - if not message or message[0] != "[" or message[-1] != "]": - return False - - message_json = json.loads(message) - message_type = message_json[0] - - if not RelayMessageType.is_valid(message_type): - return False - - if message_type == RelayMessageType.EVENT: - return self._is_valid_event_message(message_json) - - if message_type == RelayMessageType.COMMAND_RESULT: - return self._is_valid_command_result_message(message, message_json) - - return True - - def _is_valid_event_message(self, message_json): - if not len(message_json) == 3: - return False - - subscription_id = message_json[1] - with self.lock: - if subscription_id not in self.subscriptions: - return False - - e = message_json[2] - event = Event( - e["content"], - e["pubkey"], - e["created_at"], - e["kind"], - e["tags"], - e["sig"], - ) - if not event.verify(): - return False - - with self.lock: - subscription = self.subscriptions[subscription_id] - - if subscription.filters and not subscription.filters.match(event): - return False - - return True - - def _is_valid_command_result_message(self, message, message_json): - if not len(message_json) < 3: - return False - - if message_json[2] != True: - logger.warning(f"Relay '{self.url}' negative command result: '{message}'") - self._append_error_message(message) - return False - - return True - def _append_error_message(self, message): - self.error_list = ([message] + self.error_list)[:20] - self.last_error_date = int(time.time()) \ No newline at end of file + self.error_counter += 1 + self.error_list = [message] + self.error_list + self.last_error_date = int(time.time()) diff --git a/nostr/relay_manager.py b/nostr/relay_manager.py index f639fb0..9ec2850 100644 --- a/nostr/relay_manager.py +++ b/nostr/relay_manager.py @@ -1,4 +1,3 @@ - import asyncio import ssl import threading @@ -8,7 +7,7 @@ from .filter import Filters from .message_pool import MessagePool, NoticeMessage -from .relay import Relay, RelayPolicy +from .relay import Relay from .subscription import Subscription @@ -25,40 +24,36 @@ def __init__(self) -> None: self._cached_subscriptions: dict[str, Subscription] = {} self._subscriptions_lock = threading.Lock() - def add_relay(self, url: str, read: bool = True, write: bool = True) -> Relay: + def add_relay(self, url: str) -> Relay: if url in list(self.relays.keys()): return - - with self._subscriptions_lock: - subscriptions = self._cached_subscriptions.copy() - policy = RelayPolicy(read, write) - relay = Relay(url, policy, self.message_pool, subscriptions) + relay = Relay(url, self.message_pool) self.relays[url] = relay self._open_connection( - relay, - {"cert_reqs": ssl.CERT_NONE} + relay, {"cert_reqs": ssl.CERT_NONE} ) # NOTE: This disables ssl certificate verification - relay.publish_subscriptions() + relay.publish_subscriptions(self._cached_subscriptions.values()) return relay def remove_relay(self, url: str): + # try-catch? self.relays[url].close() self.relays.pop(url) self.threads[url].join(timeout=5) self.threads.pop(url) self.queue_threads[url].join(timeout=5) self.queue_threads.pop(url) - def add_subscription(self, id: str, filters: Filters): + s = Subscription(id, filters) with self._subscriptions_lock: - self._cached_subscriptions[id] = Subscription(id, filters) + self._cached_subscriptions[id] = s for relay in self.relays.values(): - relay.add_subscription(id, filters) + relay.publish_subscriptions([s]) def close_subscription(self, id: str): with self._subscriptions_lock: @@ -72,22 +67,22 @@ def check_and_restart_relays(self): for relay in stopped_relays: self._restart_relay(relay) - def close_connections(self): for relay in self.relays.values(): relay.close() def publish_message(self, message: str): for relay in self.relays.values(): - if relay.policy.should_write: - relay.publish(message) + relay.publish(message) def handle_notice(self, notice: NoticeMessage): relay = next((r for r in self.relays.values() if r.url == notice.url)) if relay: relay.add_notice(notice.content) - def _open_connection(self, relay: Relay, ssl_options: dict = None, proxy: dict = None): + def _open_connection( + self, relay: Relay, ssl_options: dict = None, proxy: dict = None + ): self.threads[relay.url] = threading.Thread( target=relay.connect, args=(ssl_options, proxy), @@ -98,7 +93,7 @@ def _open_connection(self, relay: Relay, ssl_options: dict = None, proxy: dict = def wrap_async_queue_worker(): asyncio.run(relay.queue_worker()) - + self.queue_threads[relay.url] = threading.Thread( target=wrap_async_queue_worker, name=f"{relay.url}-queue", @@ -108,14 +103,16 @@ def wrap_async_queue_worker(): def _restart_relay(self, relay: Relay): time_since_last_error = time.time() - relay.last_error_date - - min_wait_time = min(60 * relay.error_counter, 60 * 60 * 24) # try at least once a day + + min_wait_time = min( + 60 * relay.error_counter, 60 * 60 * 24 + ) # try at least once a day if time_since_last_error < min_wait_time: return - + logger.info(f"Restarting connection to relay '{relay.url}'") self.remove_relay(relay.url) new_relay = self.add_relay(relay.url) new_relay.error_counter = relay.error_counter - new_relay.error_list = relay.error_list \ No newline at end of file + new_relay.error_list = relay.error_list diff --git a/nostr/subscription.py b/nostr/subscription.py index 76da0af..10b5363 100644 --- a/nostr/subscription.py +++ b/nostr/subscription.py @@ -2,12 +2,9 @@ class Subscription: - def __init__(self, id: str, filters: Filters=None) -> None: + def __init__(self, id: str, filters: Filters = None) -> None: self.id = id self.filters = filters def to_json_object(self): - return { - "id": self.id, - "filters": self.filters.to_json_array() - } + return {"id": self.id, "filters": self.filters.to_json_array()} diff --git a/router.py b/router.py index cc0a380..c6a0a91 100644 --- a/router.py +++ b/router.py @@ -2,7 +2,7 @@ import json from typing import List, Union -from fastapi import WebSocketDisconnect +from fastapi import WebSocket, WebSocketDisconnect from loguru import logger from lnbits.helpers import urlsafe_short_hash @@ -15,28 +15,29 @@ class NostrRouter: - received_subscription_events: dict[str, list[Event]] = {} received_subscription_notices: list[NoticeMessage] = [] received_subscription_eosenotices: dict[str, EndOfStoredEventsMessage] = {} - def __init__(self, websocket): + def __init__(self, websocket: WebSocket): self.subscriptions: List[str] = [] self.connected: bool = True - self.websocket = websocket - self.tasks: List[asyncio.Task] = [] - self.original_subscription_ids = {} + self.websocket: WebSocket = websocket + self.tasks: List[asyncio.Task] = [] # chek why state is needed + self.original_subscription_ids = {} # here async def client_to_nostr(self): - """Receives requests / data from the client and forwards it to relays. If the + """ + Receives requests / data from the client and forwards it to relays. If the request was a subscription/filter, registers it with the nostr client lib. Remembers the subscription id so we can send back responses from the relay to this - client in `nostr_to_client`""" - while True: + client in `nostr_to_client` + """ + while self.connected: try: json_str = await self.websocket.receive_text() except WebSocketDisconnect: - self.connected = False + self.stop() break try: @@ -44,15 +45,15 @@ async def client_to_nostr(self): except Exception as e: logger.debug(f"Failed to handle client message: '{str(e)}'.") - async def nostr_to_client(self): - """Sends responses from relays back to the client. Polls the subscriptions of this client + """ + Sends responses from relays back to the client. Polls the subscriptions of this client stored in `my_subscriptions`. Then gets all responses for this subscription id from `received_subscription_events` which is filled in tasks.py. Takes one response after the other and relays it back to the client. Reconstructs the reponse manually because the nostr client lib we're using can't do it. Reconstructs the original subscription id that we had previously rewritten in order to avoid collisions when multiple clients use the same id. """ - while True and self.connected: + while self.connected: try: await self._handle_subscriptions() self._handle_notices() @@ -60,12 +61,12 @@ async def nostr_to_client(self): logger.debug(f"Failed to handle response for client: '{str(e)}'.") await asyncio.sleep(0.1) - async def start(self): + self.connected = True self.tasks.append(asyncio.create_task(self.client_to_nostr())) self.tasks.append(asyncio.create_task(self.nostr_to_client())) - async def stop(self): + def stop(self): for t in self.tasks: try: t.cancel() @@ -77,6 +78,11 @@ async def stop(self): nostr.client.relay_manager.close_subscription(s) except: pass + + try: + self.websocket.close() + except: + pass self.connected = False async def _handle_subscriptions(self): @@ -86,8 +92,6 @@ async def _handle_subscriptions(self): if s in NostrRouter.received_subscription_eosenotices: await self._handle_received_subscription_eosenotices(s) - - async def _handle_received_subscription_eosenotices(self, s): try: if s not in self.original_subscription_ids: @@ -95,7 +99,7 @@ async def _handle_received_subscription_eosenotices(self, s): s_original = self.original_subscription_ids[s] event_to_forward = ["EOSE", s_original] del NostrRouter.received_subscription_eosenotices[s] - + await self.websocket.send_text(json.dumps(event_to_forward)) except Exception as e: logger.debug(e) @@ -104,18 +108,18 @@ async def _handle_received_subscription_events(self, s): try: if s not in NostrRouter.received_subscription_events: return + while len(NostrRouter.received_subscription_events[s]): my_event = NostrRouter.received_subscription_events[s].pop(0) - # event.to_message() does not include the subscription ID, we have to add it manually event_json = { - "id": my_event.id, - "pubkey": my_event.public_key, - "created_at": my_event.created_at, - "kind": my_event.kind, - "tags": my_event.tags, - "content": my_event.content, - "sig": my_event.signature, - } + "id": my_event.id, + "pubkey": my_event.public_key, + "created_at": my_event.created_at, + "kind": my_event.kind, + "tags": my_event.tags, + "content": my_event.content, + "sig": my_event.signature, + } # this reconstructs the original response from the relay # reconstruct original subscription id @@ -123,18 +127,17 @@ async def _handle_received_subscription_events(self, s): event_to_forward = ["EVENT", s_original, event_json] await self.websocket.send_text(json.dumps(event_to_forward)) except Exception as e: - logger.debug(e) + logger.debug(e) # there are 2900 errors here def _handle_notices(self): while len(NostrRouter.received_subscription_notices): my_event = NostrRouter.received_subscription_notices.pop(0) # note: we don't send it to the user because we don't know who should receive it - logger.info(f"Relay ('{my_event.url}') notice: '{my_event.content}']") + logger.info(f"[Relay '{my_event.url}'] Notice: '{my_event.content}']") nostr.client.relay_manager.handle_notice(my_event) - - def _marshall_nostr_filters(self, data: Union[dict, list]): + # todo: get rid of this filters = data if isinstance(data, list) else [data] filters = [Filter.parse_obj(f) for f in filters] filter_list: list[NostrFilter] = [] @@ -161,13 +164,12 @@ async def _handle_client_to_nostr(self, json_str): """ json_data = json.loads(json_str) - assert len(json_data) - + assert len(json_data), "Bad JSON array" if json_data[0] == "REQ": self._handle_client_req(json_data) return - + if json_data[0] == "CLOSE": self._handle_client_close(json_data[1]) return @@ -181,18 +183,25 @@ def _handle_client_req(self, json_data): subscription_id_rewritten = urlsafe_short_hash() self.original_subscription_ids[subscription_id_rewritten] = subscription_id fltr = json_data[2:] - filters = self._marshall_nostr_filters(fltr) + filters = self._marshall_nostr_filters(fltr) # revisit - nostr.client.relay_manager.add_subscription( - subscription_id_rewritten, filters - ) + nostr.client.relay_manager.add_subscription(subscription_id_rewritten, filters) request_rewritten = json.dumps([json_data[0], subscription_id_rewritten] + fltr) - - self.subscriptions.append(subscription_id_rewritten) - nostr.client.relay_manager.publish_message(request_rewritten) + + self.subscriptions.append(subscription_id_rewritten) # why here also? + nostr.client.relay_manager.publish_message( + request_rewritten + ) # both `add_subscription` and `publish_message`? def _handle_client_close(self, subscription_id): - subscription_id_rewritten = next((k for k, v in self.original_subscription_ids.items() if v == subscription_id), None) + subscription_id_rewritten = next( + ( + k + for k, v in self.original_subscription_ids.items() + if v == subscription_id + ), + None, + ) if subscription_id_rewritten: self.original_subscription_ids.pop(subscription_id_rewritten) nostr.client.relay_manager.close_subscription(subscription_id_rewritten) diff --git a/tasks.py b/tasks.py index 4c316bc..f025b0f 100644 --- a/tasks.py +++ b/tasks.py @@ -9,6 +9,7 @@ from .router import NostrRouter, nostr +#### revisit async def init_relays(): # reinitialize the entire client nostr.__init__() @@ -20,14 +21,14 @@ async def init_relays(): async def check_relays(): - """ Check relays that have been disconnected """ + """Check relays that have been disconnected""" while True: try: await asyncio.sleep(20) nostr.client.relay_manager.check_and_restart_relays() except Exception as e: logger.warning(f"Cannot restart relays: '{str(e)}'.") - + async def subscribe_events(): while not any([r.connected for r in nostr.client.relay_manager.relays.values()]): @@ -39,14 +40,16 @@ def callback_events(eventMessage: EventMessage): if eventMessage.event.id in set( [ e.id - for e in NostrRouter.received_subscription_events[eventMessage.subscription_id] + for e in NostrRouter.received_subscription_events[ + eventMessage.subscription_id + ] ] ): return - NostrRouter.received_subscription_events[eventMessage.subscription_id].append( - eventMessage.event - ) + NostrRouter.received_subscription_events[ + eventMessage.subscription_id + ].append(eventMessage.event) else: NostrRouter.received_subscription_events[eventMessage.subscription_id] = [ eventMessage.event @@ -59,7 +62,10 @@ def callback_notices(noticeMessage: NoticeMessage): return def callback_eose_notices(eventMessage: EndOfStoredEventsMessage): - if eventMessage.subscription_id not in NostrRouter.received_subscription_eosenotices: + if ( + eventMessage.subscription_id + not in NostrRouter.received_subscription_eosenotices + ): NostrRouter.received_subscription_eosenotices[ eventMessage.subscription_id ] = eventMessage @@ -67,11 +73,13 @@ def callback_eose_notices(eventMessage: EndOfStoredEventsMessage): return def wrap_async_subscribe(): - asyncio.run(nostr.client.subscribe( - callback_events, - callback_notices, - callback_eose_notices, - )) + asyncio.run( + nostr.client.subscribe( + callback_events, + callback_notices, + callback_eose_notices, + ) + ) t = threading.Thread( target=wrap_async_subscribe, diff --git a/templates/nostrclient/index.html b/templates/nostrclient/index.html index a0c5999..9e0eb31 100644 --- a/templates/nostrclient/index.html +++ b/templates/nostrclient/index.html @@ -6,13 +6,30 @@
- +
- - - + + @@ -29,18 +46,36 @@
Nostrclient
- +
- +