From 8bd3ae74c0b0bef67fbaa462d343cb23b32bd96d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 5 Oct 2020 14:16:43 +0100 Subject: [PATCH] Move autocommit functionality to database class --- synapse/storage/database.py | 36 ++++++++++++++++++++++----- synapse/storage/engines/_base.py | 8 ++++++ synapse/storage/engines/postgres.py | 3 +++ synapse/storage/engines/sqlite.py | 5 ++++ synapse/storage/util/id_generators.py | 18 +++----------- 5 files changed, 50 insertions(+), 20 deletions(-) diff --git a/synapse/storage/database.py b/synapse/storage/database.py index b36ea7ccdfe6..cb6b68c0b1e2 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -566,7 +566,12 @@ def new_transaction( sql_txn_timer.labels(desc).observe(duration) async def runInteraction( - self, desc: str, func: "Callable[..., R]", *args: Any, **kwargs: Any + self, + desc: str, + func: "Callable[..., R]", + *args: Any, + db_autocommit: bool = False, + **kwargs: Any ) -> R: """Starts a transaction on the database and runs a given function @@ -576,6 +581,10 @@ async def runInteraction( database transaction (twisted.enterprise.adbapi.Transaction) as its first argument, followed by `args` and `kwargs`. + db_autocommit: Whether to run the function in "autocommit" mode, + i.e. outside of a transaction. This is useful for transaction + that are only a single query. Currently only affects postgres. + args: positional args to pass to `func` kwargs: named args to pass to `func` @@ -596,6 +605,7 @@ async def runInteraction( exception_callbacks, func, *args, + db_autocommit=db_autocommit, **kwargs ) @@ -609,7 +619,11 @@ async def runInteraction( return cast(R, result) async def runWithConnection( - self, func: "Callable[..., R]", *args: Any, **kwargs: Any + self, + func: "Callable[..., R]", + *args: Any, + db_autocommit: bool = False, + **kwargs: Any ) -> R: """Wraps the .runWithConnection() method on the underlying db_pool. @@ -618,6 +632,9 @@ async def runWithConnection( database connection (twisted.enterprise.adbapi.Connection) as its first argument, followed by `args` and `kwargs`. args: positional args to pass to `func` + db_autocommit: Whether to run the function in "autocommit" mode, + i.e. outside of a transaction. This is useful for transaction + that are only a single query. Currently only affects postgres. kwargs: named args to pass to `func` Returns: @@ -649,10 +666,17 @@ def inner_func(conn, *args, **kwargs): logger.debug("Reconnecting closed database connection") conn.reconnect() - db_conn = LoggingDatabaseConnection( - conn, self.engine, "runWithConnection" - ) - return func(db_conn, *args, **kwargs) + try: + if db_autocommit: + self.engine.set_autocommit(conn, True) + + db_conn = LoggingDatabaseConnection( + conn, self.engine, "runWithConnection" + ) + return func(db_conn, *args, **kwargs) + finally: + if db_autocommit: + self.engine.set_autocommit(conn, False) return await make_deferred_yieldable( self._db_pool.runWithConnection(inner_func, *args, **kwargs) diff --git a/synapse/storage/engines/_base.py b/synapse/storage/engines/_base.py index 314d2e8d80a8..475270b6f398 100644 --- a/synapse/storage/engines/_base.py +++ b/synapse/storage/engines/_base.py @@ -103,3 +103,11 @@ def in_transaction(self, conn: Connection) -> bool: """Whether the connection is currently in a transaction. """ ... + + @abc.abstractmethod + def set_autocommit(self, conn: Connection, autocommit: bool): + """Set the connections autocommit mode. + + When True queries are run outside of transactions. + """ + ... diff --git a/synapse/storage/engines/postgres.py b/synapse/storage/engines/postgres.py index 5be910dc0805..a8563faefd32 100644 --- a/synapse/storage/engines/postgres.py +++ b/synapse/storage/engines/postgres.py @@ -177,3 +177,6 @@ def server_version(self): def in_transaction(self, conn: Connection) -> bool: return conn.status != self.module.extensions.STATUS_READY # type: ignore + + def set_autocommit(self, conn: Connection, autocommit: bool): + return conn.set_session(autocommit=autocommit) # type: ignore diff --git a/synapse/storage/engines/sqlite.py b/synapse/storage/engines/sqlite.py index 00bced3618f8..c12f5225aeed 100644 --- a/synapse/storage/engines/sqlite.py +++ b/synapse/storage/engines/sqlite.py @@ -110,6 +110,11 @@ def server_version(self): def in_transaction(self, conn: Connection) -> bool: return conn.in_transaction # type: ignore + def set_autocommit(self, conn: Connection, autocommit: bool): + # Twisted doesn't let us set attributes on the connections, so we can't + # set the connection to autocommit mode. + pass + # Following functions taken from: https://github.com/coleifer/peewee diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py index fba37036ff59..28673f2ae693 100644 --- a/synapse/storage/util/id_generators.py +++ b/synapse/storage/util/id_generators.py @@ -574,18 +574,6 @@ def _update_stream_positions_table_txn(self, txn: LoggingTransaction): pos = (self.get_current_token_for_writer(self._instance_name),) txn.execute(sql, (self._stream_name, self._instance_name, pos)) - def _update_stream_positions_table_conn(self, conn: LoggingDatabaseConnection): - # We use autocommit/read committed here so that we don't have to go - # through a transaction dance, which a) adds latency and b) runs the - # risk of serialization errors. - try: - conn.conn.set_session(autocommit=True) # type: ignore - - with conn.cursor(txn_name="MultiWriterIdGenerator._update_table") as cur: - self._update_stream_positions_table_txn(cur) - finally: - conn.conn.set_session(autocommit=False) # type: ignore - @attr.s(slots=True) class _AsyncCtxManagerWrapper: @@ -649,8 +637,10 @@ async def __aexit__(self, exc_type, exc, tb): # We only do this on the success path so that the persisted current # position points to a persisted row with the correct instance name. if self.id_gen._writers: - await self.id_gen._db.runWithConnection( - self.id_gen._update_stream_positions_table_conn, + await self.id_gen._db.runInteraction( + "MultiWriterIdGenerator._update_table", + self.id_gen._update_stream_positions_table_txn, + db_autocommit=True, ) return False