diff --git a/.github/workflows/integrations_test.yml b/.github/workflows/integrations_test.yml index 9c40121c62..63191da41f 100644 --- a/.github/workflows/integrations_test.yml +++ b/.github/workflows/integrations_test.yml @@ -48,6 +48,9 @@ jobs: - name: Testnet4 Bootstrap And Catchup test-path: integrations/testnet4_test.py + - name: Testnet4 Start and Shutdown + test-path: integrations/shutdown_test.py + # Other tests # run the docker-compose tests diff --git a/counterparty-core/counterpartycore/lib/api/apiserver.py b/counterparty-core/counterpartycore/lib/api/apiserver.py index 2a6bbcd8b4..598986389a 100644 --- a/counterparty-core/counterpartycore/lib/api/apiserver.py +++ b/counterparty-core/counterpartycore/lib/api/apiserver.py @@ -493,8 +493,9 @@ def run_apiserver( logger.info("Starting API Server process...") def handle_interrupt_signal(signum, frame): - logger.warning("Keyboard interrupt received. Shutting down...") - raise KeyboardInterrupt + pass + # logger.warning("Keyboard interrupt received. Shutting down...") + # raise KeyboardInterrupt wsgi_server = None parent_checker = None @@ -524,7 +525,11 @@ def handle_interrupt_signal(signum, frame): app = init_flask_app() app.shared_backend_height = shared_backend_height - wsgi_server = wsgi.WSGIApplication(app, args=args) + try: + wsgi_server = wsgi.WSGIApplication(app, args=args) + except OSError as e: + logger.error(f"Error starting WSGI Server: {e}") + exit(1) logger.info("Starting Parent Process Checker thread...") parent_checker = ParentProcessChecker(wsgi_server, stop_event, parent_pid) @@ -535,9 +540,6 @@ def handle_interrupt_signal(signum, frame): wsgi_server.run(server_ready_value, shared_backend_height) - except KeyboardInterrupt: - pass - finally: logger.info("Stopping API Server...") @@ -554,6 +556,7 @@ def handle_interrupt_signal(signum, frame): watcher.join() logger.info("API Server stopped.") + server_ready_value.value = 2 # This thread is used for the following two reasons: diff --git a/counterparty-core/counterpartycore/lib/api/wsgi.py b/counterparty-core/counterpartycore/lib/api/wsgi.py index a2a82b4972..c50f6591da 100644 --- a/counterparty-core/counterpartycore/lib/api/wsgi.py +++ b/counterparty-core/counterpartycore/lib/api/wsgi.py @@ -130,7 +130,8 @@ def spawn_worker(self): logger.info("Worker exiting (pid: %s)", worker.pid) try: worker.tmp.close() - self.cfg.worker_exit(self, worker) + sys.exit(-1) + # self.cfg.worker_exit(self, worker) except Exception: logger.warning("Exception during worker exit") @@ -206,7 +207,7 @@ def __init__(self, app, args=None): self.arbiter = None self.ledger_db = None self.state_db = None - + self.current_state_thread = None self.master_pid = os.getpid() super().__init__() @@ -236,7 +237,8 @@ def run(self, server_ready_value, shared_backend_height): sys.exit(1) def stop(self): - self.current_state_thread.stop() + if self.current_state_thread: + self.current_state_thread.stop() if self.arbiter and self.master_pid == os.getpid(): logger.info("Stopping Gunicorn") self.arbiter.kill_all_workers() diff --git a/counterparty-core/counterpartycore/lib/backend/bitcoind.py b/counterparty-core/counterpartycore/lib/backend/bitcoind.py index 661e7d4cc5..e0c2bedb53 100644 --- a/counterparty-core/counterpartycore/lib/backend/bitcoind.py +++ b/counterparty-core/counterpartycore/lib/backend/bitcoind.py @@ -13,6 +13,7 @@ from requests.exceptions import ChunkedEncodingError, ConnectionError, ReadTimeout, Timeout from counterpartycore.lib import config, exceptions +from counterpartycore.lib.ledger.currentstate import CurrentState from counterpartycore.lib.parser import deserialize, utxosinfo logger = logging.getLogger(config.LOGGER_NAME) @@ -36,6 +37,8 @@ def clean_url_for_log(url): # for testing def should_retry(): + if CurrentState().block_parser_status() == "Stopping": + return False return True diff --git a/counterparty-core/counterpartycore/lib/backend/rsfetcher.py b/counterparty-core/counterpartycore/lib/backend/rsfetcher.py index 9bd1951e51..eada9a0363 100644 --- a/counterparty-core/counterpartycore/lib/backend/rsfetcher.py +++ b/counterparty-core/counterpartycore/lib/backend/rsfetcher.py @@ -63,7 +63,6 @@ def __init__(self, indexer_config=None): self.config["log_level"] = config.LOG_LEVEL_STRING else: logger.warning("Using custom indexer config.") - print(indexer_config) self.config = indexer_config self.config["network"] = config.NETWORK_NAME self.fetcher = None diff --git a/counterparty-core/counterpartycore/lib/cli/main.py b/counterparty-core/counterpartycore/lib/cli/main.py index c6a68c3af0..b575d9bf7a 100755 --- a/counterparty-core/counterpartycore/lib/cli/main.py +++ b/counterparty-core/counterpartycore/lib/cli/main.py @@ -385,6 +385,14 @@ def float_range_checker(arg): "help": "Don't parse new blocks, only run the API server", }, ], + [ + ("--catch-up",), + { + "choices": ["normal", "bootstrap", "bootstrap-always"], + "default": "normal", + "help": "Catch up mode (default: normal)", + }, + ], ] @@ -457,12 +465,6 @@ def arg_parser(no_config_file=False, app_name=APP_NAME): parser_server = subparsers.add_parser("start", help="run the server") parser_server.add_argument("--config-file", help="the path to the configuration file") - parser_server.add_argument( - "--catch-up", - choices=["normal", "bootstrap", "bootstrap-always"], - default="normal", - help="Catch up mode (default: normal)", - ) setup.add_config_arguments(parser_server, CONFIG_ARGS, configfile) parser_reparse = subparsers.add_parser( diff --git a/counterparty-core/counterpartycore/lib/cli/server.py b/counterparty-core/counterpartycore/lib/cli/server.py index 7cfcd6a677..cc1a877cf6 100755 --- a/counterparty-core/counterpartycore/lib/cli/server.py +++ b/counterparty-core/counterpartycore/lib/cli/server.py @@ -11,6 +11,7 @@ from urllib.parse import quote_plus as urlencode import appdirs +import apsw import bitcoin as bitcoinlib from termcolor import colored, cprint @@ -635,64 +636,70 @@ def __init__(self): def run(self): self.db = database.get_db_connection(config.DATABASE, read_only=True, check_wal=False) - try: - while not self.stop_event.is_set(): - if time.time() - self.last_check > 60 * 60 * 12: - try: - check.asset_conservation(self.db, self.stop_event) - except exceptions.SanityError as e: - logger.error("Asset conservation check failed: %s" % e) - _thread.interrupt_main() - self.last_check = time.time() - time.sleep(1) - finally: - if self.db is not None: - self.db.close() - self.db = None - logger.info("Thread stopped.") + while not self.stop_event.is_set(): + if time.time() - self.last_check > 60 * 60 * 12: + try: + check.asset_conservation(self.db, self.stop_event) + except exceptions.SanityError as e: + logger.error("Asset conservation check failed: %s" % e) + _thread.interrupt_main() + except apsw.InterruptError: + break + self.last_check = time.time() + time.sleep(1) def stop(self): - self.stop_event.set() logger.info("Stopping Asset Conservation Checker thread...") + self.stop_event.set() + self.db.interrupt() self.join() + if self.db is not None: + self.db.close() + self.db = None + logger.info("Asset Conservation Checker thread stopped.") -def start_all(args, log_stream=None): - api_status_poller = None - apiserver_v1 = None - apiserver_v2 = None - follower_daemon = None - asset_conservation_checker = None - db = None - api_stop_event = None - backend_height_thread = None - - # Log all config parameters, sorted by key - # Filter out default values #TODO: these should be set in a different way - custom_config = { - k: v - for k, v in sorted(config.__dict__.items()) - if not k.startswith("__") and not k.startswith("DEFAULT_") - } - logger.debug(f"Config: {custom_config}") +class CounterpartyServer(threading.Thread): + def __init__(self, args, log_stream=None): + threading.Thread.__init__(self, name="CounterpartyServer") + self.daemon = True + self.args = args + self.api_status_poller = None + self.apiserver_v1 = None + self.apiserver_v2 = None + self.follower_daemon = None + self.asset_conservation_checker = None + self.db = None + self.api_stop_event = None + self.backend_height_thread = None + self.log_stream = log_stream + + # Log all config parameters, sorted by key + # Filter out default values #TODO: these should be set in a different way + custom_config = { + k: v + for k, v in sorted(config.__dict__.items()) + if not k.startswith("__") and not k.startswith("DEFAULT_") + } + logger.debug(f"Config: {custom_config}") - try: + def run(self): # download bootstrap if necessary if ( - not os.path.exists(config.DATABASE) and args.catch_up == "bootstrap" - ) or args.catch_up == "bootstrap-always": - bootstrap.bootstrap(no_confirm=True, snapshot_url=args.bootstrap_url) + not os.path.exists(config.DATABASE) and self.args.catch_up == "bootstrap" + ) or self.args.catch_up == "bootstrap-always": + bootstrap.bootstrap(no_confirm=True, snapshot_url=self.args.bootstrap_url) # Initialise database database.apply_outstanding_migration(config.DATABASE, config.LEDGER_DB_MIGRATIONS_DIR) - db = database.initialise_db() - CurrentState().set_current_block_index(ledger.blocks.last_db_index(db)) - blocks.check_database_version(db) - database.optimize(db) + self.db = database.initialise_db() + CurrentState().set_current_block_index(ledger.blocks.last_db_index(self.db)) + blocks.check_database_version(self.db) + database.optimize(self.db) - if args.rebuild_state_db: + if self.args.rebuild_state_db: dbbuilder.build_state_db() - elif args.refresh_state_db: + elif self.args.refresh_state_db: state_db = database.get_db_connection(config.STATE_DATABASE, read_only=False) dbbuilder.refresh_state_db(state_db) state_db.close() @@ -700,105 +707,97 @@ def start_all(args, log_stream=None): # Check software version check.software_version() - backend_height_thread = BackendHeight() - backend_height_thread.daemon = True - backend_height_thread.start() - CurrentState().set_backend_height_value(backend_height_thread.shared_backend_height) + self.backend_height_thread = BackendHeight() + self.backend_height_thread.daemon = True + self.backend_height_thread.start() + CurrentState().set_backend_height_value(self.backend_height_thread.shared_backend_height) # API Server v2 - api_stop_event = multiprocessing.Event() - apiserver_v2 = api_v2.APIServer(api_stop_event, backend_height_thread.shared_backend_height) - apiserver_v2.start(args, log_stream) - while not apiserver_v2.is_ready() and not apiserver_v2.has_stopped(): + self.api_stop_event = multiprocessing.Event() + self.apiserver_v2 = api_v2.APIServer( + self.api_stop_event, self.backend_height_thread.shared_backend_height + ) + self.apiserver_v2.start(self.args, self.log_stream) + while not self.apiserver_v2.is_ready(): logger.trace("Waiting for API server to start...") + if self.apiserver_v2.has_stopped(): + logger.error("API server stopped unexpectedly.") + return time.sleep(0.1) - if args.api_only: + if self.args.api_only: while True: - api_stop_event.wait(1) + self.api_stop_event.wait(1) return # Backend ensure_backend_is_up() # API Status Poller - api_status_poller = apiv1.APIStatusPoller() - api_status_poller.daemon = True - api_status_poller.start() + self.api_status_poller = apiv1.APIStatusPoller() + self.api_status_poller.daemon = True + self.api_status_poller.start() # API Server v1 - apiserver_v1 = apiv1.APIServer() - apiserver_v1.daemon = True - apiserver_v1.start() + self.apiserver_v1 = apiv1.APIServer() + self.apiserver_v1.daemon = True + self.apiserver_v1.start() # delete blocks with no ledger hashes # in case of reparse interrupted - blocks.rollback_empty_block(db) + blocks.rollback_empty_block(self.db) # Asset conservation checker if config.CHECK_ASSET_CONSERVATION: - asset_conservation_checker = AssetConservationChecker() - asset_conservation_checker.start() + self.asset_conservation_checker = AssetConservationChecker() + self.asset_conservation_checker.start() # Reset (delete) rust fetcher database blocks.reset_rust_fetcher_database() # catch up - blocks.catch_up(db) + blocks.catch_up(self.db, self.api_stop_event) # Blockchain watcher logger.info("Watching for new blocks...") - follower_daemon = follow.start_blockchain_watcher(db) - follower_daemon.start() + self.follower_daemon = follow.start_blockchain_watcher(self.db) + self.follower_daemon.start() + + def stop(self): + logger.info("Shutting down...") + CurrentState().set_block_parser_status("Stopping") + + # Ensure all threads are stopped + if self.follower_daemon: + self.follower_daemon.stop() + if self.asset_conservation_checker: + self.asset_conservation_checker.stop() + if self.backend_height_thread: + self.backend_height_thread.stop() + if self.api_stop_event: + self.api_stop_event.set() + if self.api_status_poller: + self.api_status_poller.stop() + if self.apiserver_v1: + self.apiserver_v1.stop() + if self.apiserver_v2: + self.apiserver_v2.stop() + + logger.info("Shutdown complete.") + +def start_all(args, log_stream=None): + server = CounterpartyServer(args, log_stream) + try: + server.start() + while True: + server.join(1) except KeyboardInterrupt: logger.warning("Keyboard interrupt received. Shutting down...") - pass except Exception as e: logger.error("Exception caught!", exc_info=e) finally: - # Ensure all threads are stopped - if backend_height_thread: - backend_height_thread.stop() - if api_stop_event: - api_stop_event.set() - if api_status_poller: - api_status_poller.stop() - if apiserver_v1: - apiserver_v1.stop() - if follower_daemon: - follower_daemon.stop() - if asset_conservation_checker: - asset_conservation_checker.stop() - - if apiserver_v2: - logger.info("Waiting for API processes to stop...") - apiserver_v2.stop() - while not apiserver_v2.has_stopped(): - time.sleep(0.1) - - # then close the database with write access - if db: - database.close(db) - - # Now it's safe to check for WAL files - for db_name, db_path in [ - ("Ledger DB", config.DATABASE), - ("State DB", config.STATE_DATABASE), - ]: - try: - database.check_wal_file(db_path) - except exceptions.WALFileFoundError: - db_file = config.DATABASE if db_name == "Ledger DB" else config.STATE_DATABASE - db = database.get_db_connection(db_file, read_only=False, check_wal=False) - db.close() - except exceptions.DatabaseError: - logger.warning( - f"{db_name} is in use by another process and was unable to be closed correctly." - ) - - log.shutdown() - logger.info("Shutdown complete.") + server.stop() def reparse(block_index): diff --git a/counterparty-core/counterpartycore/lib/parser/blocks.py b/counterparty-core/counterpartycore/lib/parser/blocks.py index 81f9f28dec..77991e6e8e 100644 --- a/counterparty-core/counterpartycore/lib/parser/blocks.py +++ b/counterparty-core/counterpartycore/lib/parser/blocks.py @@ -890,6 +890,8 @@ def catch_up(db, check_asset_conservation=True): parsed_blocks = 0 while CurrentState().current_block_index() < block_count: + if CurrentState().block_parser_status() == "Stopping": + return # Get block information and transactions fetch_time_start = time.time() if fetcher is None: diff --git a/counterparty-core/counterpartycore/lib/parser/follow.py b/counterparty-core/counterpartycore/lib/parser/follow.py index 95b12e756a..48ad570e9b 100644 --- a/counterparty-core/counterpartycore/lib/parser/follow.py +++ b/counterparty-core/counterpartycore/lib/parser/follow.py @@ -91,8 +91,10 @@ def __init__(self, db): self.last_software_version_check_time = 0 self.last_mempool_parsing_time = 0 # catch up and clean mempool before starting + self.stop_event = threading.Event() self.mempool_parser = None if not config.NO_MEMPOOL: + CurrentState().set_block_parser_status("Initializing") mempool.clean_mempool(self.db) self.mempool_parser = RawMempoolParser(self.db) self.mempool_parser.start() @@ -255,7 +257,7 @@ async def handle(self): self.check_software_version_if_needed() late_since = None - while True: + while True and not self.stop_event.is_set(): try: if not config.NO_MEMPOOL: if len(RAW_MEMPOOL) > 0: @@ -294,9 +296,6 @@ async def handle(self): break # Exit the loop if the task is cancelled except Exception as e: logger.error("Error in handle loop: %s", e) - import traceback - - print(traceback.format_exc()) # for debugging capture_exception(e) self.stop() break # Optionally break the loop on other exceptions @@ -305,14 +304,17 @@ def start(self): logger.debug("Starting blockchain watcher...") # Schedule the handle coroutine once self.task = self.loop.create_task(self.handle()) - self.loop.run_forever() + try: + self.loop.run_forever() + finally: + self.loop.close() def stop(self): logger.debug("Stopping blockchain watcher...") # Cancel the handle task + self.stop_event.set() self.task.cancel() self.loop.stop() - self.loop.close() # Clean up ZMQ context self.zmq_context.destroy() # Stop mempool parser @@ -348,13 +350,15 @@ def get_raw_mempool(db): class RawMempoolParser(threading.Thread): def __init__(self, db): threading.Thread.__init__(self, name="RawMempoolParser") + self.db = db self.daemon = True - self.tx_hashes_chunks, self.timestamps = get_raw_mempool(db) self.stop_event = threading.Event() + self.tx_hashes_chunks, self.timestamps = get_raw_mempool(self.db) def run(self): logger.debug("Starting RawMempoolParser...") start = time.time() + counter = 0 while len(self.tx_hashes_chunks) > 0 and not self.stop_event.is_set(): txhash_list = self.tx_hashes_chunks.pop(0) @@ -370,10 +374,11 @@ def run(self): ) def stop(self): - if self.is_alive(): - logger.debug("Stopping RawMempoolParser...") - self.stop_event.set() - self.join() + logger.debug("Stopping RawMempoolParser...") + self.db.interrupt() + # if self.is_alive(): + self.stop_event.set() + self.join() class NotSupportedTransactionsCache(metaclass=helpers.SingletonMeta): diff --git a/counterparty-core/counterpartycore/lib/utils/database.py b/counterparty-core/counterpartycore/lib/utils/database.py index fcbc5914dd..cab579a9eb 100644 --- a/counterparty-core/counterpartycore/lib/utils/database.py +++ b/counterparty-core/counterpartycore/lib/utils/database.py @@ -159,7 +159,7 @@ def initialise_db(): cprint("THE OPTION `--force` IS NOT FOR USE ON PRODUCTION SYSTEMS.", "yellow") # Database - logger.info(f"Connecting to database... (SQLite {apsw.apswversion()})") + logger.debug(f"Connecting to database... (SQLite {apsw.apswversion()})") db = get_connection(read_only=False) return db diff --git a/counterparty-core/counterpartycore/test/integrations/http2https.py b/counterparty-core/counterpartycore/test/integrations/http2https.py index da244fae72..4b05b1f57c 100644 --- a/counterparty-core/counterpartycore/test/integrations/http2https.py +++ b/counterparty-core/counterpartycore/test/integrations/http2https.py @@ -38,7 +38,7 @@ def handle(self): data=body, headers={"Content-Type": "application/json"}, verify=False, - timeout=5, + timeout=15, ) # Send response back to client @@ -66,6 +66,7 @@ class ThreadedTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer): def __init__(self, server_address, RequestHandlerClass, target_url): self.target_url = target_url + print("Proxying to", target_url) # Create a session to reuse TCP connection self.session = requests.Session() diff --git a/counterparty-core/counterpartycore/test/integrations/load_test.py b/counterparty-core/counterpartycore/test/integrations/load_test.py index cf079099e8..36dd05ea57 100644 --- a/counterparty-core/counterpartycore/test/integrations/load_test.py +++ b/counterparty-core/counterpartycore/test/integrations/load_test.py @@ -1,284 +1,14 @@ -import random -import urllib.parse +import time from io import StringIO -import gevent -import locust -import reparsetest -from counterpartycore.lib.api.routes import ALL_ROUTES -from counterpartycore.lib.utils import database - - -def generate_mainnet_fixtures(db_file): - db = database.get_db_connection(db_file, read_only=True) - - class MainnetFixtures: - last_block = db.execute( - "SELECT block_hash FROM blocks ORDER BY block_index DESC LIMIT 1" - ).fetchone() - last_tx = db.execute( - "SELECT tx_hash, tx_index, block_index FROM transactions ORDER BY rowid DESC LIMIT 1" - ).fetchone() - utxo_with_balance = db.execute( - "SELECT * FROM balances WHERE utxo IS NOT null AND quantity > 0 ORDER BY rowid DESC LIMIT 1" - ).fetchone() - last_dispenser = db.execute( - "SELECT * FROM dispensers ORDER BY rowid DESC LIMIT 1" - ).fetchone() - last_dispense = db.execute("SELECT * FROM dispenses ORDER BY rowid DESC LIMIT 1").fetchone() - last_order = db.execute("SELECT * FROM orders ORDER BY rowid DESC LIMIT 1").fetchone() - last_bet = db.execute("SELECT * FROM bets ORDER BY rowid DESC LIMIT 1").fetchone() - last_dividend = db.execute("SELECT * FROM dividends ORDER BY rowid DESC LIMIT 1").fetchone() - last_event = db.execute("SELECT * FROM messages ORDER BY rowid DESC LIMIT 1").fetchone() - last_issuance = db.execute("SELECT * FROM issuances ORDER BY rowid DESC LIMIT 1").fetchone() - last_sweep = db.execute("SELECT * FROM sweeps ORDER BY rowid DESC LIMIT 1").fetchone() - last_broadcast = db.execute( - "SELECT * FROM broadcasts ORDER BY rowid DESC LIMIT 1" - ).fetchone() - last_fairminter = db.execute( - "SELECT * FROM fairminters ORDER BY rowid DESC LIMIT 1" - ).fetchone() - last_fairmint = db.execute("SELECT * FROM fairmints ORDER BY rowid DESC LIMIT 1").fetchone() - asset, asset1, asset2 = "XCP", "PEPECASH", "FAIREST" - datahex = "00000014000000a25be34b66000000174876e800010000000000000000000f446976697369626c65206173736574" - jdog_address = "1JDogZS6tQcSxwfxhv6XKKjcyicYA4Feev" - jdog_tx_hash = "032d29f789f7fc0aa8d268431a02001a0d4ee9dc42ca4b21de26b912f101271c" - raw_transaction = "0100000001b43530bc300f44a078bae943cb6ad3add44111ce2f815dad1deb921c912462d9020000008b483045022100849a06573b994a95b239cbaadf8cd266bdc5fc64535be43bcb786e29b515089502200a6fd9876ef888b67f1097928f7386f55e775b5812eb9ba22609abfdfe8d3f2f01410426156245525daa71f2e84a40797bcf28099a2c508662a8a33324a703597b9aa2661a79a82ffb4caaa9b15f4094622fbfa85f8b9dc7381f991f5a265421391cc3ffffffff020000000000000000436a4145b0b98d99423f507895e7dbdf4b7973f7bd422984872c56c241007de56d991ce1c74270f773cf03896f4a50ee0df5eb153571ce2a767f51c7c9d7569bad277e9da9a78200000000001976a914bce6191bf2fd5981313cae869e9fafe164f7dbaf88ac00000000" - compose_args = { - "/v2/addresses/
/compose/bet": { - "feed_address": "1JDogZS6tQcSxwfxhv6XKKjcyicYA4Feev", - "bet_type": 3, - "deadline": 1388000200, - "wager_quantity": 10, - "counterwager_quantity": 10, - "target_value": 0, - "leverage": 5040, - "expiration": 1000, - }, - "/v2/addresses/
/compose/broadcast": { - "timestamp": 1388000002, - "value": 1, - "fee_fraction": 0.05, - "text": "Load Test", - }, - "/v2/addresses/
/compose/btcpay": None, - "/v2/addresses/
/compose/burn": None, - "/v2/addresses/
/compose/cancel": None, - "/v2/addresses/
/compose/destroy": { - "asset": "XCP", - "quantity": 1, - "tag": "string", - }, - "/v2/addresses/
/compose/dispenser": { - "asset": "XCP", - "give_quantity": 100, - "escrow_quantity": 100, - "mainchainrate": 100, - "status": 0, - }, - "/v2/addresses/
/compose/dividend": { - "quantity_per_unit": 1, - "asset": "A4931122120200000000", - "dividend_asset": "XCP", - }, - "/v2/addresses/
/compose/dividend/estimatexcpfees": { - "quantity_per_unit": 1, - "asset": "A4931122120200000000", - "dividend_asset": "XCP", - }, - "/v2/addresses/
/compose/issuance": { - "asset": "DAVASABLE", - "quantity": 10000000000, - "transfer_destination": None, - "divisible": True, - "lock": None, - "reset": None, - "description": "Divisible asset", - }, - "/v2/addresses/
/compose/mpma": { - "assets": "XCP,A4931122120200000000", - "destinations": "1CounterpartyXXXXXXXXXXXXXXXUWLpVr,1CounterpartyXXXXXXXXXXXXXXXUWLpVr", - "quantities": "1,1", - }, - "/v2/addresses/
/compose/order": { - "give_asset": "XCP", - "give_quantity": 1, - "get_asset": "A4931122120200000000", - "get_quantity": 1, - "expiration": 2000, - "fee_required": 0, - }, - "/v2/addresses/
/compose/send": { - "asset": "XCP", - "quantity": 100, - "destination": "1CounterpartyXXXXXXXXXXXXXXXUWLpVr", - }, - "/v2/addresses/
/compose/sweep": { - "destination": "1CounterpartyXXXXXXXXXXXXXXXUWLpVr", - "flags": 7, - "memo": "aa", - }, - "/v2/addresses/
/compose/sweep/estimatexcpfees": { - "destination": "1CounterpartyXXXXXXXXXXXXXXXUWLpVr", - "flags": 7, - "memo": "aa", - }, - "/v2/addresses/
/compose/dispense": { - "dispenser": last_dispenser["source"], - "quantity": 1, - }, - "/v2/addresses/
/compose/fairminter": { - "asset": "LOADTEST", - "max_mint_per_tx": 100, - }, - "/v2/addresses/
/compose/fairmint": { - "asset": last_fairminter["asset"], - }, - "/v2/addresses/
/compose/attach": { - "asset": "XCP", - "quantity": 100, - }, - "/v2/addresses/
/compose/attach/estimatexcpfees": {}, - "/v2/utxos//compose/detach": {}, - "/v2/utxos//compose/movetoutxo": { - "destination": "1JDogZS6tQcSxwfxhv6XKKjcyicYA4Feev", - "inputs_set": f"{utxo_with_balance['utxo']}:10000", - }, - } - compose_common_args = { - "validate": "false", - "pubkeys": "0426156245525daa71f2e84a40797bcf28099a2c508662a8a33324a703597b9aa2661a79a82ffb4caaa9b15f4094622fbfa85f8b9dc7381f991f5a265421391cc3", - "exact_fee": 0, - "disable_utxo_locks": "true", - } - - db.close() - return MainnetFixtures - - -def random_offset(): - return random.randint(1, 10000) # noqa S311 # Example of random range - - -def random_limit(): - return random.randint(1, 1000) # noqa S311 - - -def random_verbose(): - return random.choice(["true", "false"]) # noqa S311 - - -def random_params(): - return "&".join( - [ - urllib.parse.urlencode({"offset": random_offset()}), - urllib.parse.urlencode({"limit": random_limit()}), - urllib.parse.urlencode({"verbose": random_verbose()}), - ] - ) - - -def prepare_url(route, MainnetFixtures): - # exclude broadcast signed tx and API v1 - if route in ["/v2/bitcoin/transactions", "/", "/v1/", "/api/", "/rpc/"]: - return None - - url = route.replace("", str(MainnetFixtures.last_tx["block_index"])) - url = url.replace("", MainnetFixtures.last_block["block_hash"]) - url = url.replace("", MainnetFixtures.last_order["tx_hash"]) - url = url.replace("", MainnetFixtures.last_dispenser["tx_hash"]) - url = url.replace("", MainnetFixtures.last_bet["tx_hash"]) - url = url.replace("", MainnetFixtures.last_dividend["tx_hash"]) - url = url.replace("", str(MainnetFixtures.last_tx["tx_index"])) - url = url.replace("", str(MainnetFixtures.last_event["message_index"])) - url = url.replace("", str(MainnetFixtures.last_event["event"])) - url = url.replace("", MainnetFixtures.asset) - url = url.replace("", MainnetFixtures.asset1) - url = url.replace("", MainnetFixtures.asset2) - url = url.replace("
", MainnetFixtures.jdog_address) - url = url.replace("", MainnetFixtures.utxo_with_balance["utxo"]) - - if url == "/v2/transactions//info": - url = url.replace("", MainnetFixtures.jdog_tx_hash) - elif url.startswith("/v2/issuances/"): - url = url.replace("", MainnetFixtures.last_issuance["tx_hash"]) - elif url.startswith("/v2/sweeps/"): - url = url.replace("", MainnetFixtures.last_sweep["tx_hash"]) - elif url.startswith("/v2/broadcasts/"): - url = url.replace("", MainnetFixtures.last_broadcast["tx_hash"]) - elif url.startswith("/v2/fairminters/"): - url = url.replace("", MainnetFixtures.last_fairminter["tx_hash"]) - elif url.startswith("/v2/fairmints/"): - url = url.replace("", MainnetFixtures.last_fairmint["tx_hash"]) - elif url.startswith("/v2/dispenses/"): - url = url.replace("", MainnetFixtures.last_dispense["tx_hash"]) - else: - url = url.replace("", MainnetFixtures.last_tx["tx_hash"]) - - if url == "/v2/transactions/info": - url = url + "?rawtransaction=" + MainnetFixtures.raw_transaction - elif url == "/v2/bitcoin/transactions/decode": - url = url + "?rawtx=" + MainnetFixtures.raw_transaction - elif url == "/v2/transactions/unpack": - url = url + "?datahex=" + MainnetFixtures.datahex - elif url in [ - "/v2/addresses/balances", - "/v2/addresses/transactions", - "/v2/addresses/events", - "/v2/addresses/mempool", - "/v2/bitcoin/addresses/utxos", - ]: - url = url + "?addresses=" + MainnetFixtures.jdog_address - elif url == "/v2/utxos/withbalances": - url = url + "?utxos=" + MainnetFixtures.utxo_with_balance["utxo"] - - if "/compose/" in route: - if MainnetFixtures.compose_args.get(route): - params = MainnetFixtures.compose_args[route] | MainnetFixtures.compose_common_args - query_string = [] - for key, value in params.items(): - if not isinstance(value, list): - query_string.append(urllib.parse.urlencode({key: value})) - else: - for i in range(len(value)): - query_string.append(urllib.parse.urlencode({key: value[i]})) - query_string = "&".join(query_string) - url = url + "?" + query_string - else: - return None - - chr = "&" if "?" in url else "?" - url = url + chr + random_params() - - return url - - -def generate_random_url(MainnetFixtures): - while True: - url = prepare_url(random.choice(list(ALL_ROUTES.keys())), MainnetFixtures) # noqa S311 - if url: - return url - - -class CounterpartyCoreUser(locust.HttpUser): - host = "http://localhost:4000" # Counterparty API URL - wait_time = locust.between(0.5, 1) - network_timeout = 15.0 - connection_timeout = 15.0 - MainnetFixtures = None - - @locust.task - def get_random_url(self): - headers = {"Content-Type": "application/json"} - self.client.get(generate_random_url(CounterpartyCoreUser.MainnetFixtures), headers=headers) +from counterpartycore.test.integrations import reparsetest +from counterpartycore.test.integrations.locustrunner import run_locust def test_load(): sh_counterparty_server, backend_url, db_file, api_url = reparsetest.prepare("mainnet") sh_counterparty_server("bootstrap") - CounterpartyCoreUser.MainnetFixtures = generate_mainnet_fixtures(db_file) - try: out = StringIO() server_process = sh_counterparty_server( @@ -298,26 +28,9 @@ def test_load(): while "API.Watcher - Catch up completed" not in out.getvalue(): print("Waiting for server to be ready...") - gevent.sleep(1) - - locust.log.setup_logging("INFO") - - user_count = 4 - spawn_rate = 2 - test_duration = 60 * 5 # 5 minutes - - env = locust.env.Environment(user_classes=[CounterpartyCoreUser]) - env.create_local_runner() + time.sleep(1) - # start a greenlet that periodically outputs the current stats - gevent.spawn(locust.stats.stats_printer(env.stats)) - # start a greenlet that save current stats to history - gevent.spawn(locust.stats.stats_history, env.runner) - # start the test - env.runner.start(user_count, spawn_rate=spawn_rate) - # in test_duration seconds stop the runner - gevent.spawn_later(test_duration, lambda: env.runner.quit()) - env.runner.greenlet.join() + env = run_locust(db_file) print(env.stats.serialize_errors()) assert env.stats.total.num_failures == 0 diff --git a/counterparty-core/counterpartycore/test/integrations/locustrunner.py b/counterparty-core/counterpartycore/test/integrations/locustrunner.py new file mode 100644 index 0000000000..d7a0a1c393 --- /dev/null +++ b/counterparty-core/counterpartycore/test/integrations/locustrunner.py @@ -0,0 +1,319 @@ +import os +import random +import urllib.parse + +import gevent +import locust +from counterpartycore.lib.api.routes import ALL_ROUTES +from counterpartycore.lib.utils import database + + +def generate_mainnet_fixtures(db_file): + db = database.get_db_connection(db_file, read_only=True) + + class MainnetFixtures: + last_block = db.execute( + "SELECT block_hash FROM blocks ORDER BY block_index DESC LIMIT 1" + ).fetchone() + last_tx = db.execute( + "SELECT tx_hash, tx_index, block_index FROM transactions ORDER BY rowid DESC LIMIT 1" + ).fetchone() + utxo_with_balance = db.execute( + "SELECT * FROM balances WHERE utxo IS NOT null AND quantity > 0 ORDER BY rowid DESC LIMIT 1" + ).fetchone() + last_dispenser = db.execute( + "SELECT * FROM dispensers ORDER BY rowid DESC LIMIT 1" + ).fetchone() + last_dispense = db.execute("SELECT * FROM dispenses ORDER BY rowid DESC LIMIT 1").fetchone() + last_order = db.execute("SELECT * FROM orders ORDER BY rowid DESC LIMIT 1").fetchone() + last_bet = db.execute("SELECT * FROM bets ORDER BY rowid DESC LIMIT 1").fetchone() + last_dividend = db.execute("SELECT * FROM dividends ORDER BY rowid DESC LIMIT 1").fetchone() + last_event = db.execute("SELECT * FROM messages ORDER BY rowid DESC LIMIT 1").fetchone() + last_issuance = db.execute("SELECT * FROM issuances ORDER BY rowid DESC LIMIT 1").fetchone() + last_sweep = db.execute("SELECT * FROM sweeps ORDER BY rowid DESC LIMIT 1").fetchone() + last_broadcast = db.execute( + "SELECT * FROM broadcasts ORDER BY rowid DESC LIMIT 1" + ).fetchone() + last_fairminter = db.execute( + "SELECT * FROM fairminters ORDER BY rowid DESC LIMIT 1" + ).fetchone() + last_fairmint = db.execute("SELECT * FROM fairmints ORDER BY rowid DESC LIMIT 1").fetchone() + asset, asset1, asset2 = "XCP", "PEPECASH", "FAIREST" + datahex = "00000014000000a25be34b66000000174876e800010000000000000000000f446976697369626c65206173736574" + jdog_address = "1JDogZS6tQcSxwfxhv6XKKjcyicYA4Feev" + jdog_tx_hash = "032d29f789f7fc0aa8d268431a02001a0d4ee9dc42ca4b21de26b912f101271c" + raw_transaction = "0100000001b43530bc300f44a078bae943cb6ad3add44111ce2f815dad1deb921c912462d9020000008b483045022100849a06573b994a95b239cbaadf8cd266bdc5fc64535be43bcb786e29b515089502200a6fd9876ef888b67f1097928f7386f55e775b5812eb9ba22609abfdfe8d3f2f01410426156245525daa71f2e84a40797bcf28099a2c508662a8a33324a703597b9aa2661a79a82ffb4caaa9b15f4094622fbfa85f8b9dc7381f991f5a265421391cc3ffffffff020000000000000000436a4145b0b98d99423f507895e7dbdf4b7973f7bd422984872c56c241007de56d991ce1c74270f773cf03896f4a50ee0df5eb153571ce2a767f51c7c9d7569bad277e9da9a78200000000001976a914bce6191bf2fd5981313cae869e9fafe164f7dbaf88ac00000000" + compose_args = { + "/v2/addresses/
/compose/bet": { + "feed_address": "1JDogZS6tQcSxwfxhv6XKKjcyicYA4Feev", + "bet_type": 3, + "deadline": 1388000200, + "wager_quantity": 10, + "counterwager_quantity": 10, + "target_value": 0, + "leverage": 5040, + "expiration": 1000, + }, + "/v2/addresses/
/compose/broadcast": { + "timestamp": 1388000002, + "value": 1, + "fee_fraction": 0.05, + "text": "Load Test", + }, + "/v2/addresses/
/compose/btcpay": None, + "/v2/addresses/
/compose/burn": None, + "/v2/addresses/
/compose/cancel": None, + "/v2/addresses/
/compose/destroy": { + "asset": "XCP", + "quantity": 1, + "tag": "string", + }, + "/v2/addresses/
/compose/dispenser": { + "asset": "XCP", + "give_quantity": 100, + "escrow_quantity": 100, + "mainchainrate": 100, + "status": 0, + }, + "/v2/addresses/
/compose/dividend": { + "quantity_per_unit": 1, + "asset": "A4931122120200000000", + "dividend_asset": "XCP", + }, + "/v2/addresses/
/compose/dividend/estimatexcpfees": { + "quantity_per_unit": 1, + "asset": "A4931122120200000000", + "dividend_asset": "XCP", + }, + "/v2/addresses/
/compose/issuance": { + "asset": "DAVASABLE", + "quantity": 10000000000, + "transfer_destination": None, + "divisible": True, + "lock": None, + "reset": None, + "description": "Divisible asset", + }, + "/v2/addresses/
/compose/mpma": { + "assets": "XCP,A4931122120200000000", + "destinations": "1CounterpartyXXXXXXXXXXXXXXXUWLpVr,1CounterpartyXXXXXXXXXXXXXXXUWLpVr", + "quantities": "1,1", + }, + "/v2/addresses/
/compose/order": { + "give_asset": "XCP", + "give_quantity": 1, + "get_asset": "A4931122120200000000", + "get_quantity": 1, + "expiration": 2000, + "fee_required": 0, + }, + "/v2/addresses/
/compose/send": { + "asset": "XCP", + "quantity": 100, + "destination": "1CounterpartyXXXXXXXXXXXXXXXUWLpVr", + }, + "/v2/addresses/
/compose/sweep": { + "destination": "1CounterpartyXXXXXXXXXXXXXXXUWLpVr", + "flags": 7, + "memo": "aa", + }, + "/v2/addresses/
/compose/sweep/estimatexcpfees": { + "destination": "1CounterpartyXXXXXXXXXXXXXXXUWLpVr", + "flags": 7, + "memo": "aa", + }, + "/v2/addresses/
/compose/dispense": { + "dispenser": last_dispenser["source"], + "quantity": 1, + }, + "/v2/addresses/
/compose/fairminter": { + "asset": "LOADTEST", + "max_mint_per_tx": 100, + }, + "/v2/addresses/
/compose/fairmint": { + "asset": last_fairminter["asset"], + }, + "/v2/addresses/
/compose/attach": { + "asset": "XCP", + "quantity": 100, + }, + "/v2/addresses/
/compose/attach/estimatexcpfees": {}, + "/v2/utxos//compose/detach": {}, + "/v2/utxos//compose/movetoutxo": { + "destination": "1JDogZS6tQcSxwfxhv6XKKjcyicYA4Feev", + "inputs_set": f"{utxo_with_balance['utxo']}:10000", + }, + } + compose_common_args = { + "validate": "false", + "pubkeys": "0426156245525daa71f2e84a40797bcf28099a2c508662a8a33324a703597b9aa2661a79a82ffb4caaa9b15f4094622fbfa85f8b9dc7381f991f5a265421391cc3", + "exact_fee": 0, + "disable_utxo_locks": "true", + } + + db.close() + return MainnetFixtures + + +def random_offset(): + return random.randint(1, 10000) # noqa S311 # Example of random range + + +def random_limit(): + return random.randint(1, 1000) # noqa S311 + + +def random_verbose(): + return random.choice(["true", "false"]) # noqa S311 + + +def random_params(): + return "&".join( + [ + urllib.parse.urlencode({"offset": random_offset()}), + urllib.parse.urlencode({"limit": random_limit()}), + urllib.parse.urlencode({"verbose": random_verbose()}), + ] + ) + + +def prepare_url(route, MainnetFixtures): + # exclude broadcast signed tx and API v1 + if route in ["/v2/bitcoin/transactions", "/", "/v1/", "/api/", "/rpc/"]: + return None + + url = route.replace("", str(MainnetFixtures.last_tx["block_index"])) + url = url.replace("", MainnetFixtures.last_block["block_hash"]) + url = url.replace("", MainnetFixtures.last_order["tx_hash"]) + url = url.replace("", MainnetFixtures.last_dispenser["tx_hash"]) + url = url.replace("", MainnetFixtures.last_bet["tx_hash"]) + url = url.replace("", MainnetFixtures.last_dividend["tx_hash"]) + url = url.replace("", str(MainnetFixtures.last_tx["tx_index"])) + url = url.replace("", str(MainnetFixtures.last_event["message_index"])) + url = url.replace("", str(MainnetFixtures.last_event["event"])) + url = url.replace("", MainnetFixtures.asset) + url = url.replace("", MainnetFixtures.asset1) + url = url.replace("", MainnetFixtures.asset2) + url = url.replace("
", MainnetFixtures.jdog_address) + url = url.replace("", MainnetFixtures.utxo_with_balance["utxo"]) + + if url == "/v2/transactions//info": + url = url.replace("", MainnetFixtures.jdog_tx_hash) + elif url.startswith("/v2/issuances/"): + url = url.replace("", MainnetFixtures.last_issuance["tx_hash"]) + elif url.startswith("/v2/sweeps/"): + url = url.replace("", MainnetFixtures.last_sweep["tx_hash"]) + elif url.startswith("/v2/broadcasts/"): + url = url.replace("", MainnetFixtures.last_broadcast["tx_hash"]) + elif url.startswith("/v2/fairminters/"): + url = url.replace("", MainnetFixtures.last_fairminter["tx_hash"]) + elif url.startswith("/v2/fairmints/"): + url = url.replace("", MainnetFixtures.last_fairmint["tx_hash"]) + elif url.startswith("/v2/dispenses/"): + url = url.replace("", MainnetFixtures.last_dispense["tx_hash"]) + else: + url = url.replace("", MainnetFixtures.last_tx["tx_hash"]) + + if url == "/v2/transactions/info": + url = url + "?rawtransaction=" + MainnetFixtures.raw_transaction + elif url == "/v2/bitcoin/transactions/decode": + url = url + "?rawtx=" + MainnetFixtures.raw_transaction + elif url == "/v2/transactions/unpack": + url = url + "?datahex=" + MainnetFixtures.datahex + elif url in [ + "/v2/addresses/balances", + "/v2/addresses/transactions", + "/v2/addresses/events", + "/v2/addresses/mempool", + "/v2/bitcoin/addresses/utxos", + ]: + url = url + "?addresses=" + MainnetFixtures.jdog_address + elif url == "/v2/utxos/withbalances": + url = url + "?utxos=" + MainnetFixtures.utxo_with_balance["utxo"] + + if "/compose/" in route: + if MainnetFixtures.compose_args.get(route): + params = MainnetFixtures.compose_args[route] | MainnetFixtures.compose_common_args + query_string = [] + for key, value in params.items(): + if not isinstance(value, list): + query_string.append(urllib.parse.urlencode({key: value})) + else: + for i in range(len(value)): + query_string.append(urllib.parse.urlencode({key: value[i]})) + query_string = "&".join(query_string) + url = url + "?" + query_string + else: + return None + + chr = "&" if "?" in url else "?" + url = url + chr + random_params() + + return url + + +def generate_random_url(MainnetFixtures): + while True: + url = prepare_url(random.choice(list(ALL_ROUTES.keys())), MainnetFixtures) # noqa S311 + if url: + return url + + +class CounterpartyCoreUser(locust.HttpUser): + host = "http://localhost:4000" # Counterparty API URL + wait_time = locust.between(0.5, 1) + network_timeout = 15.0 + connection_timeout = 15.0 + MainnetFixtures = None + + @locust.task + def get_random_url(self): + headers = {"Content-Type": "application/json"} + self.client.get(generate_random_url(CounterpartyCoreUser.MainnetFixtures), headers=headers) + + +def run_locust(db_file, duration=300, wait_time=None, user_count=4, stats_printer=True): + CounterpartyCoreUser.MainnetFixtures = generate_mainnet_fixtures(db_file) + CounterpartyCoreUser.wait_time = wait_time or locust.between(0.5, 1) + + locust.log.setup_logging("INFO") + + spawn_rate = 2 + test_duration = 60 * 5 # 5 minutes + + env = locust.env.Environment(user_classes=[CounterpartyCoreUser]) + env.create_local_runner() + + try: + web_ui = env.create_web_ui("127.0.0.1", 8089) + + # start a greenlet that periodically outputs the current stats + if stats_printer: + gevent.spawn(locust.stats.stats_printer(env.stats)) + # start a greenlet that save current stats to history + gevent.spawn(locust.stats.stats_history, env.runner) + # start the test + env.runner.start(user_count, spawn_rate=spawn_rate) + # in test_duration seconds stop the runner + if duration: + gevent.spawn_later(test_duration, lambda: env.runner.quit()) + + env.runner.greenlet.join() + except KeyboardInterrupt: + pass + finally: + if duration is None: + env.runner.quit() + web_ui.stop() + + return env + + +if __name__ == "__main__": + run_locust( + os.path.expanduser("~/.local/share/counterparty/counterparty.db"), + duration=None, + wait_time=locust.between(0.1, 0.3), + user_count=5, + stats_printer=False, + ) diff --git a/counterparty-core/counterpartycore/test/integrations/reparsetest.py b/counterparty-core/counterpartycore/test/integrations/reparsetest.py index 68beb9a121..a2e5eec344 100644 --- a/counterparty-core/counterpartycore/test/integrations/reparsetest.py +++ b/counterparty-core/counterpartycore/test/integrations/reparsetest.py @@ -9,7 +9,7 @@ from http2https import PROXY_PORT, start_http_proxy, stop_http_proxy # DATA_DIR = os.path.join(tempfile.gettempdir(), "counterparty-data") -DATA_DIR = os.path.join(os.path.expanduser("~/.cache"), "counterparty-data") +DATA_DIR = os.path.join(os.path.expanduser("~/.cache"), "counterparty-test-data") def prepare(network): diff --git a/counterparty-core/counterpartycore/test/integrations/shutdown_test.py b/counterparty-core/counterpartycore/test/integrations/shutdown_test.py new file mode 100644 index 0000000000..2c8c7d15f6 --- /dev/null +++ b/counterparty-core/counterpartycore/test/integrations/shutdown_test.py @@ -0,0 +1,76 @@ +import random +import socket +import time +from io import StringIO + +from counterpartycore.lib.cli import server +from counterpartycore.lib.cli.main import arg_parser +from counterpartycore.test.integrations import reparsetest +from http2https import PROXY_PORT, start_http_proxy, stop_http_proxy + + +def is_port_in_used(port): + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + try: + s.bind(("127.0.0.1", port)) + return False + except socket.error: + return True + finally: + s.close() + + +def test_shutdown(): + sh_counterparty_server, backend_url, db_file, api_url = reparsetest.prepare("testnet4") + + try: + start_http_proxy(backend_url) + + parser = arg_parser(no_config_file=True) + args = parser.parse_args( + [ + "--testnet4", + "--data-dir", + reparsetest.DATA_DIR, + "--cache-dir", + reparsetest.DATA_DIR, + "start", + "--backend-connect", + "127.0.0.1", + "--backend-port", + f"{PROXY_PORT}", + "--wsgi-server", + "gunicorn", + ] + ) + + log_stream = StringIO() + server.initialise_log_and_config(args, log_stream=log_stream) + + test_duration = random.randint(1, 60) # noqa S311 + start_time = time.time() + + print("Test duration: ", test_duration) + + counterparty_server = server.CounterpartyServer(args, log_stream) + counterparty_server.start() + while time.time() - start_time < test_duration: + counterparty_server.join(1) + + assert is_port_in_used(44000) + + finally: + print("Shutting down server...") + counterparty_server.stop() + stop_http_proxy() + + logs = log_stream.getvalue() + + assert "Ledger.Main - Shutting down..." in logs + assert "Ledger.Main - Asset Conservation Checker thread stopped." in logs + assert "Ledger.BackendHeight - BackendHeight Thread stopped." in logs + assert "Ledger.Main - API Server v1 thread stopped." in logs + assert "Ledger.Main - API Server process stopped." in logs + assert "Ledger.Main - Shutdown complete." in logs + + assert not is_port_in_used(44000) diff --git a/counterparty-core/tools/comparebalances.py b/counterparty-core/tools/comparebalances.py new file mode 100644 index 0000000000..8a5612ce2e --- /dev/null +++ b/counterparty-core/tools/comparebalances.py @@ -0,0 +1,51 @@ +#!/usr/bin/python3 + + +import apsw + +db_ok = apsw.Connection( + "/home/ouziel/.local/share/counterparty/counterparty.db", flags=apsw.SQLITE_OPEN_READONLY +) +db_nok = apsw.Connection( + "/home/ouziel/.local/share/counterparty/asset_conservation.db", flags=apsw.SQLITE_OPEN_READONLY +) + + +sql = """ + SELECT address, asset, quantity, (address || asset) AS aa, MAX(rowid) + FROM balances + WHERE address IS NOT NULL AND utxo IS NULL AND asset = 'XCP' AND quantity > 0 + GROUP BY aa +""" + +cursor_ok = db_ok.cursor() +cursor_nok = db_nok.cursor() + +cursor_ok.execute(sql) +cursor_nok.execute(sql) + +balances_ok = cursor_ok.fetchall() +balances_nok = cursor_nok.fetchall() + +print(len(balances_ok)) +print(len(balances_nok)) + +balances_by_address_ok = {} +for balance in balances_ok: + address = balance[0] + quantity = balance[2] + balances_by_address_ok[address] = quantity + +balances_by_address_nok = {} +for balance in balances_nok: + address = balance[0] + quantity = balance[2] + balances_by_address_nok[address] = quantity + +for address, balance in balances_by_address_ok.items(): + if address not in balances_by_address_nok: + print(f"Address {address} not found in asset_conservation.db") + elif balance != balances_by_address_nok[address]: + print( + f"Address {address} has different balance in asset_conservation.db: {balance} != {balances_by_address_nok[address]}" + ) diff --git a/counterparty-core/tools/compareledger.py b/counterparty-core/tools/compareledger.py index bc82984aaf..2a56b31cb2 100644 --- a/counterparty-core/tools/compareledger.py +++ b/counterparty-core/tools/compareledger.py @@ -149,7 +149,7 @@ def get_last_block(database_file_1, database_file_2): database_file_1 = sys.argv[1] database_file_2 = sys.argv[2] -LAST_BLOCK = 650000 +LAST_BLOCK = 340000 # compare_ledger(database_file_1, database_file_2) check_hashes(database_file_1, database_file_2, "ledger_hash") # get_checkpoints(database_file_1) diff --git a/counterparty-core/tools/finddivergence.py b/counterparty-core/tools/finddivergence.py new file mode 100644 index 0000000000..54369fc631 --- /dev/null +++ b/counterparty-core/tools/finddivergence.py @@ -0,0 +1,51 @@ +#!/usr/bin/env python3 + +import difflib +import json +import time + +import sh + +SERVER_1 = "https://dev.counterparty.io:4000/" +SERVER_2 = "http://localhost:4000/" + +START_BLOCK = 872150 + + +def compare_strings(string1, string2): + """Compare strings diff-style.""" + diff = list(difflib.unified_diff(string1.splitlines(1), string2.splitlines(1), n=0)) + if len(diff): + print("\nDifferences:") + print("\n".join(diff)) + return len(diff) + + +def get_hashes(server, block_index): + result = json.loads(sh.curl(f"{server}v2/blocks/{block_index}").strip())["result"] + return result["ledger_hash"], result["txlist_hash"] + + +def get_events(server, block_index): + result = json.loads(sh.curl(f"{server}v2/blocks/{block_index}/events").strip())["result"] + return result + + +block_index = START_BLOCK +hashes_1 = get_hashes(SERVER_1, block_index) +hashes_2 = get_hashes(SERVER_2, block_index) +while hashes_1[0] != hashes_2[0]: + print(f"Block {block_index} NOK") + time.sleep(0.1) + block_index -= 1 + hashes_1 = get_hashes(SERVER_1, block_index) + hashes_2 = get_hashes(SERVER_2, block_index) + +print(f"Block {block_index} OK") + +block_index += 1 +print(f"First bad block: {block_index}") +events_1 = get_events(SERVER_1, block_index) +events_2 = get_events(SERVER_2, block_index) + +compare_strings(json.dumps(events_1, indent=4), json.dumps(events_2, indent=4)) diff --git a/counterparty-rs/src/indexer/handlers/new.rs b/counterparty-rs/src/indexer/handlers/new.rs index f8ed99dffa..a0e03dac26 100644 --- a/counterparty-rs/src/indexer/handlers/new.rs +++ b/counterparty-rs/src/indexer/handlers/new.rs @@ -1,7 +1,7 @@ use std::cmp::max; use crossbeam_channel::bounded; -use tracing::info; +use tracing::{info, debug}; use crate::indexer::{ bitcoin_client::BitcoinClient, config::Config, database::Database, logging::setup_logging, @@ -16,11 +16,11 @@ pub fn new(config: Config) -> Result { let stopper = Stopper::new(); let client = BitcoinClient::new(&config, stopper.clone(), parallelism.into())?; let handles = client.start()?; - info!("Connecting to database: {}", config.db_dir); + debug!("Connecting to database: {}", config.db_dir); let db = Database::new(config.db_dir.to_string())?; - info!("Connected"); + debug!("Connected"); let chan = bounded(64); - info!("Initialized"); + debug!("Initialized"); Ok(Indexer { config, diff --git a/counterparty-rs/src/indexer/handlers/start.rs b/counterparty-rs/src/indexer/handlers/start.rs index 940cb1f3ee..1f97629f32 100644 --- a/counterparty-rs/src/indexer/handlers/start.rs +++ b/counterparty-rs/src/indexer/handlers/start.rs @@ -10,7 +10,7 @@ use crate::indexer::{ workers::{consumer, extractor, fetcher, new_worker_pool, orderer, producer, reporter, writer}, }; use crossbeam_channel::{bounded, unbounded}; -use tracing::info; +use tracing::{info, debug}; pub fn new( parallelism: usize, @@ -52,8 +52,8 @@ where "First Bitcoin client op: GetBlockchainHeight".into(), || client.get_blockchain_height(), )?; - info!("Starting at block height: {}", start_height); - info!("Targeting block height: {}", target_block); + debug!("Starting at block height: {}", start_height); + debug!("Targeting block height: {}", target_block); handles.append(&mut new_worker_pool( "Producer".into(), diff --git a/counterparty-rs/src/indexer/utils.rs b/counterparty-rs/src/indexer/utils.rs index 0f911f840d..c5b14877fc 100644 --- a/counterparty-rs/src/indexer/utils.rs +++ b/counterparty-rs/src/indexer/utils.rs @@ -95,11 +95,11 @@ pub fn timed(m: String, f: F) -> Result where F: FnOnce() -> Result, { - info!("{}...", m); + debug!("{}...", m); let start = Instant::now(); let result = f(); let duration = start.elapsed(); - info!("{} took {:?}", m, duration); + debug!("{} took {:?}", m, duration); result } diff --git a/release-notes/release-notes-v10.10.1.md b/release-notes/release-notes-v10.10.1.md index 44fc3511e5..7bca0252a2 100644 --- a/release-notes/release-notes-v10.10.1.md +++ b/release-notes/release-notes-v10.10.1.md @@ -11,6 +11,10 @@ - Handle correctly errors in subprocess when bootstrapping - Fix `getrawtransaction_batch()` for batches greater than `config.MAX_RPC_BATCH_SIZE` +- Better error handling for port taken +- Fix ungraceful ZeroMQ Failure +- Fix Conservation Check Failing Ungracefull +- Cleaner Gunicorn Shutdown ## Codebase @@ -22,6 +26,8 @@ ## CLI +- Accepts `--catch-up` flag before the command +- Add a locust runner to test local node (`python3 counterpartycore/test/integration/locustrunner.py` will start Locust Web UI on http://localhost:8089/). # Credits