Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Move autocommit functionality to database class
Browse files Browse the repository at this point in the history
  • Loading branch information
erikjohnston committed Oct 5, 2020
1 parent 2242bec commit 8bd3ae7
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 20 deletions.
36 changes: 30 additions & 6 deletions synapse/storage/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -566,7 +566,12 @@ def new_transaction(
sql_txn_timer.labels(desc).observe(duration)

async def runInteraction(
self, desc: str, func: "Callable[..., R]", *args: Any, **kwargs: Any
self,
desc: str,
func: "Callable[..., R]",
*args: Any,
db_autocommit: bool = False,
**kwargs: Any
) -> R:
"""Starts a transaction on the database and runs a given function
Expand All @@ -576,6 +581,10 @@ async def runInteraction(
database transaction (twisted.enterprise.adbapi.Transaction) as
its first argument, followed by `args` and `kwargs`.
db_autocommit: Whether to run the function in "autocommit" mode,
i.e. outside of a transaction. This is useful for transaction
that are only a single query. Currently only affects postgres.
args: positional args to pass to `func`
kwargs: named args to pass to `func`
Expand All @@ -596,6 +605,7 @@ async def runInteraction(
exception_callbacks,
func,
*args,
db_autocommit=db_autocommit,
**kwargs
)

Expand All @@ -609,7 +619,11 @@ async def runInteraction(
return cast(R, result)

async def runWithConnection(
self, func: "Callable[..., R]", *args: Any, **kwargs: Any
self,
func: "Callable[..., R]",
*args: Any,
db_autocommit: bool = False,
**kwargs: Any
) -> R:
"""Wraps the .runWithConnection() method on the underlying db_pool.
Expand All @@ -618,6 +632,9 @@ async def runWithConnection(
database connection (twisted.enterprise.adbapi.Connection) as
its first argument, followed by `args` and `kwargs`.
args: positional args to pass to `func`
db_autocommit: Whether to run the function in "autocommit" mode,
i.e. outside of a transaction. This is useful for transaction
that are only a single query. Currently only affects postgres.
kwargs: named args to pass to `func`
Returns:
Expand Down Expand Up @@ -649,10 +666,17 @@ def inner_func(conn, *args, **kwargs):
logger.debug("Reconnecting closed database connection")
conn.reconnect()

db_conn = LoggingDatabaseConnection(
conn, self.engine, "runWithConnection"
)
return func(db_conn, *args, **kwargs)
try:
if db_autocommit:
self.engine.set_autocommit(conn, True)

db_conn = LoggingDatabaseConnection(
conn, self.engine, "runWithConnection"
)
return func(db_conn, *args, **kwargs)
finally:
if db_autocommit:
self.engine.set_autocommit(conn, False)

return await make_deferred_yieldable(
self._db_pool.runWithConnection(inner_func, *args, **kwargs)
Expand Down
8 changes: 8 additions & 0 deletions synapse/storage/engines/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,3 +103,11 @@ def in_transaction(self, conn: Connection) -> bool:
"""Whether the connection is currently in a transaction.
"""
...

@abc.abstractmethod
def set_autocommit(self, conn: Connection, autocommit: bool):
"""Set the connections autocommit mode.
When True queries are run outside of transactions.
"""
...
3 changes: 3 additions & 0 deletions synapse/storage/engines/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,3 +177,6 @@ def server_version(self):

def in_transaction(self, conn: Connection) -> bool:
return conn.status != self.module.extensions.STATUS_READY # type: ignore

def set_autocommit(self, conn: Connection, autocommit: bool):
return conn.set_session(autocommit=autocommit) # type: ignore
5 changes: 5 additions & 0 deletions synapse/storage/engines/sqlite.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,11 @@ def server_version(self):
def in_transaction(self, conn: Connection) -> bool:
return conn.in_transaction # type: ignore

def set_autocommit(self, conn: Connection, autocommit: bool):
# Twisted doesn't let us set attributes on the connections, so we can't
# set the connection to autocommit mode.
pass


# Following functions taken from: https://github.com/coleifer/peewee

Expand Down
18 changes: 4 additions & 14 deletions synapse/storage/util/id_generators.py
Original file line number Diff line number Diff line change
Expand Up @@ -574,18 +574,6 @@ def _update_stream_positions_table_txn(self, txn: LoggingTransaction):
pos = (self.get_current_token_for_writer(self._instance_name),)
txn.execute(sql, (self._stream_name, self._instance_name, pos))

def _update_stream_positions_table_conn(self, conn: LoggingDatabaseConnection):
# We use autocommit/read committed here so that we don't have to go
# through a transaction dance, which a) adds latency and b) runs the
# risk of serialization errors.
try:
conn.conn.set_session(autocommit=True) # type: ignore

with conn.cursor(txn_name="MultiWriterIdGenerator._update_table") as cur:
self._update_stream_positions_table_txn(cur)
finally:
conn.conn.set_session(autocommit=False) # type: ignore


@attr.s(slots=True)
class _AsyncCtxManagerWrapper:
Expand Down Expand Up @@ -649,8 +637,10 @@ async def __aexit__(self, exc_type, exc, tb):
# We only do this on the success path so that the persisted current
# position points to a persisted row with the correct instance name.
if self.id_gen._writers:
await self.id_gen._db.runWithConnection(
self.id_gen._update_stream_positions_table_conn,
await self.id_gen._db.runInteraction(
"MultiWriterIdGenerator._update_table",
self.id_gen._update_stream_positions_table_txn,
db_autocommit=True,
)

return False

0 comments on commit 8bd3ae7

Please sign in to comment.