From 96312536f28ef73c61f9e82b34d35255cd4fa84e Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Mon, 7 Sep 2020 13:04:10 +0100 Subject: [PATCH] Refuse to upgrade database on worker processes (#8266) --- changelog.d/8266.misc | 1 + synapse/storage/prepare_database.py | 78 ++++++++++++++++++++++------- 2 files changed, 62 insertions(+), 17 deletions(-) create mode 100644 changelog.d/8266.misc diff --git a/changelog.d/8266.misc b/changelog.d/8266.misc new file mode 100644 index 000000000000..e7c899bea882 --- /dev/null +++ b/changelog.d/8266.misc @@ -0,0 +1 @@ +Do not attempt to upgrade upgrade database schema on worker processes. diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py index 964d8d9eb876..93dca96476b2 100644 --- a/synapse/storage/prepare_database.py +++ b/synapse/storage/prepare_database.py @@ -47,6 +47,22 @@ class UpgradeDatabaseException(PrepareDatabaseException): pass +OUTDATED_SCHEMA_ON_WORKER_ERROR = ( + "Expected database schema version %i but got %i: run the main synapse process to " + "upgrade the database schema before starting worker processes." +) + +EMPTY_DATABASE_ON_WORKER_ERROR = ( + "Uninitialised database: run the main synapse process to prepare the database " + "schema before starting worker processes." +) + +UNAPPLIED_DELTA_ON_WORKER_ERROR = ( + "Database schema delta %s has not been applied: run the main synapse process to " + "upgrade the database schema before starting worker processes." +) + + def prepare_database(db_conn, database_engine, config, databases=["main", "state"]): """Prepares a physical database for usage. Will either create all necessary tables or upgrade from an older schema version. @@ -71,25 +87,35 @@ def prepare_database(db_conn, database_engine, config, databases=["main", "state if version_info: user_version, delta_files, upgraded = version_info + # config should only be None when we are preparing an in-memory SQLite db, + # which should be empty. if config is None: - if user_version != SCHEMA_VERSION: - # If we don't pass in a config file then we are expecting to - # have already upgraded the DB. - raise UpgradeDatabaseException( - "Expected database schema version %i but got %i" - % (SCHEMA_VERSION, user_version) - ) - else: - _upgrade_existing_database( - cur, - user_version, - delta_files, - upgraded, - database_engine, - config, - databases=databases, + raise ValueError( + "config==None in prepare_database, but databse is not empty" ) + + # if it's a worker app, refuse to upgrade the database, to avoid multiple + # workers doing it at once. + if config.worker_app is not None and user_version != SCHEMA_VERSION: + raise UpgradeDatabaseException( + OUTDATED_SCHEMA_ON_WORKER_ERROR % (SCHEMA_VERSION, user_version) + ) + + _upgrade_existing_database( + cur, + user_version, + delta_files, + upgraded, + database_engine, + config, + databases=databases, + ) else: + # if it's a worker app, refuse to upgrade the database, to avoid multiple + # workers doing it at once. + if config and config.worker_app is not None: + raise UpgradeDatabaseException(EMPTY_DATABASE_ON_WORKER_ERROR) + _setup_new_database(cur, database_engine, databases=databases) # check if any of our configured dynamic modules want a database @@ -295,6 +321,8 @@ def _upgrade_existing_database( else: assert config + is_worker = config and config.worker_app is not None + if current_version > SCHEMA_VERSION: raise ValueError( "Cannot use this database as it is too " @@ -322,7 +350,7 @@ def _upgrade_existing_database( specific_engine_extensions = (".sqlite", ".postgres") for v in range(start_ver, SCHEMA_VERSION + 1): - logger.info("Upgrading schema to v%d", v) + logger.info("Applying schema deltas for v%d", v) # We need to search both the global and per data store schema # directories for schema updates. @@ -382,9 +410,15 @@ def _upgrade_existing_database( continue root_name, ext = os.path.splitext(file_name) + if ext == ".py": # This is a python upgrade module. We need to import into some # package and then execute its `run_upgrade` function. + if is_worker: + raise PrepareDatabaseException( + UNAPPLIED_DELTA_ON_WORKER_ERROR % relative_path + ) + module_name = "synapse.storage.v%d_%s" % (v, root_name) with open(absolute_path) as python_file: module = imp.load_source(module_name, absolute_path, python_file) @@ -399,10 +433,18 @@ def _upgrade_existing_database( continue elif ext == ".sql": # A plain old .sql file, just read and execute it + if is_worker: + raise PrepareDatabaseException( + UNAPPLIED_DELTA_ON_WORKER_ERROR % relative_path + ) logger.info("Applying schema %s", relative_path) executescript(cur, absolute_path) elif ext == specific_engine_extension and root_name.endswith(".sql"): # A .sql file specific to our engine; just read and execute it + if is_worker: + raise PrepareDatabaseException( + UNAPPLIED_DELTA_ON_WORKER_ERROR % relative_path + ) logger.info("Applying engine-specific schema %s", relative_path) executescript(cur, absolute_path) elif ext in specific_engine_extensions and root_name.endswith(".sql"): @@ -432,6 +474,8 @@ def _upgrade_existing_database( (v, True), ) + logger.info("Schema now up to date") + def _apply_module_schemas(txn, database_engine, config): """Apply the module schemas for the dynamic modules, if any