From bb1c72a74f879e0752e6156cd1c0335a196ca882 Mon Sep 17 00:00:00 2001 From: Jack Wotherspoon Date: Mon, 12 Jun 2023 14:07:29 -0400 Subject: [PATCH] docs: document SQLAlchemy async connection pooling (#758) --- README.md | 125 +++++++++++++----------- requirements-test.txt | 2 +- tests/system/test_asyncpg_connection.py | 81 +++++++++------ tests/system/test_asyncpg_iam_auth.py | 81 +++++++++------ 4 files changed, 175 insertions(+), 114 deletions(-) diff --git a/README.md b/README.md index 34e81541..a372e715 100644 --- a/README.md +++ b/README.md @@ -430,48 +430,55 @@ Once a `Connector` object is returned by `create_async_connector` you can call its `connect_async` method, just as you would the `connect` method: ```python -import asyncio import asyncpg -from google.cloud.sql.connector import create_async_connector +import sqlalchemy +from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine -async def main(): - # intialize Connector object using 'create_async_connector' - connector = await create_async_connector() +from google.cloud.sql.connector import Connector, create_async_connector - # create connection to Cloud SQL database - conn: asyncpg.Connection = await connector.connect_async( - "project:region:instance", # Cloud SQL instance connection name - "asyncpg", - user="my-user", - password="my-password", - db="my-db-name" - # ... additional database driver args - ) +async def init_connection_pool(connector: Connector) -> AsyncEngine: + # initialize Connector object for connections to Cloud SQL + async def getconn() -> asyncpg.Connection: + conn: asyncpg.Connection = await connector.connect_async( + "project:region:instance", # Cloud SQL instance connection name + "asyncpg", + user="my-user", + password="my-password", + db="my-db-name" + # ... additional database driver args + ) + return conn - # insert into Cloud SQL database (example) - await conn.execute("INSERT INTO ratings (title, genre, rating) VALUES ('Batman', 'Action', 8.2)") + # The Cloud SQL Python Connector can be used along with SQLAlchemy using the + # 'async_creator' argument to 'create_async_engine' + pool = create_async_engine( + "postgresql+asyncpg://", + async_creator=getconn, + ) + return pool - # query Cloud SQL database (example) - results = await conn.fetch("SELECT * from ratings") +async def main(): + # initialize Connector object for connections to Cloud SQL + connector = await create_async_connector() - # ... do something with results - for row in results: - print(row) + # initialize connection pool + pool = await init_connection_pool(connector) - # close asyncpg connection - await conn.close() + # example query + async with pool.connect() as conn: + await conn.execute(sqlalchemy.text("SELECT NOW()")) - # close Cloud SQL Connector + # close Connector await connector.close_async() - -# Test connection with `asyncio` -asyncio.run(main()) + # dispose of connection pool + await pool.dispose() ``` -For more details on interacting with an `asyncpg.Connection`, please visit -the [official documentation](https://magicstack.github.io/asyncpg/current/api/index.html). +For more details on additional database arguments with an `asyncpg.Connection` +, please visit the +[official documentation](https://magicstack.github.io/asyncpg/current/api/index.html). ### Async Context Manager @@ -485,44 +492,52 @@ passed in as the `loop` argument to `Connector()`. ```python import asyncio import asyncpg + +import sqlalchemy +from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine + from google.cloud.sql.connector import Connector +async def init_connection_pool(connector: Connector) -> AsyncEngine: + # initialize Connector object for connections to Cloud SQL + async def getconn() -> asyncpg.Connection: + conn: asyncpg.Connection = await connector.connect_async( + "project:region:instance", # Cloud SQL instance connection name + "asyncpg", + user="my-user", + password="my-password", + db="my-db-name" + # ... additional database driver args + ) + return conn + + # The Cloud SQL Python Connector can be used along with SQLAlchemy using the + # 'async_creator' argument to 'create_async_engine' + pool = create_async_engine( + "postgresql+asyncpg://", + async_creator=getconn, + ) + return pool + async def main(): - # get current running event loop to be used with Connector + # initialize Connector object for connections to Cloud SQL loop = asyncio.get_running_loop() - # intialize Connector object as async context manager async with Connector(loop=loop) as connector: + # initialize connection pool + pool = await init_connection_pool(connector) - # create connection to Cloud SQL database - conn: asyncpg.Connection = await connector.connect_async( - "project:region:instance", # Cloud SQL instance connection name - "asyncpg", - user="my-user", - password="my-password", - db="my-db-name" - # ... additional database driver args - ) - - # insert into Cloud SQL database (example) - await conn.execute("INSERT INTO ratings (title, genre, rating) VALUES ('Batman', 'Action', 8.2)") - - # query Cloud SQL database (example) - results = await conn.fetch("SELECT * from ratings") - - # ... do something with results - for row in results: - print(row) + # example query + async with pool.connect() as conn: + await conn.execute(sqlalchemy.text("SELECT NOW()")) - # close asyncpg connection - await conn.close() - -# Test connection with `asyncio` -asyncio.run(main()) + # dispose of connection pool + await pool.dispose() ``` ## Support policy ### Major version lifecycle + This project uses [semantic versioning](https://semver.org/), and uses the following lifecycle regarding support for a major version: diff --git a/requirements-test.txt b/requirements-test.txt index c30e5bd6..d68ca4e8 100644 --- a/requirements-test.txt +++ b/requirements-test.txt @@ -2,7 +2,7 @@ pytest==7.3.1 mock==5.0.2 pytest-cov==4.1.0 pytest-asyncio==0.21.0 -SQLAlchemy==2.0.15 +SQLAlchemy==2.0.16 sqlalchemy-pytds==0.3.5 flake8==5.0.4 flake8-annotations==2.9.1 diff --git a/tests/system/test_asyncpg_connection.py b/tests/system/test_asyncpg_connection.py index 5e5b438c..1769f0fb 100644 --- a/tests/system/test_asyncpg_connection.py +++ b/tests/system/test_asyncpg_connection.py @@ -13,53 +13,76 @@ See the License for the specific language governing permissions and limitations under the License. """ +import asyncio import os from typing import AsyncGenerator import uuid import asyncpg import pytest +import sqlalchemy +from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine -from google.cloud.sql.connector import create_async_connector +from google.cloud.sql.connector import Connector table_name = f"books_{uuid.uuid4().hex}" -@pytest.fixture(name="conn") -async def setup() -> AsyncGenerator: - # initialize Cloud SQL Python Connector object - connector = await create_async_connector() - conn: asyncpg.Connection = await connector.connect_async( - os.environ["POSTGRES_CONNECTION_NAME"], - "asyncpg", - user=os.environ["POSTGRES_USER"], - password=os.environ["POSTGRES_PASS"], - db=os.environ["POSTGRES_DB"], - ) - await conn.execute( - f"CREATE TABLE IF NOT EXISTS {table_name}" - " ( id CHAR(20) NOT NULL, title TEXT NOT NULL );" +# The Cloud SQL Python Connector can be used along with SQLAlchemy using the +# 'async_creator' argument to 'create_async_engine' +async def init_connection_pool() -> AsyncEngine: + async def getconn() -> asyncpg.Connection: + loop = asyncio.get_running_loop() + # initialize Connector object for connections to Cloud SQL + async with Connector(loop=loop) as connector: + conn: asyncpg.Connection = await connector.connect_async( + os.environ["POSTGRES_CONNECTION_NAME"], + "asyncpg", + user=os.environ["POSTGRES_USER"], + password=os.environ["POSTGRES_PASS"], + db=os.environ["POSTGRES_DB"], + ) + return conn + + # create SQLAlchemy connection pool + pool = create_async_engine( + "postgresql+asyncpg://", + async_creator=getconn, + execution_options={"isolation_level": "AUTOCOMMIT"}, ) + return pool - yield conn - await conn.execute(f"DROP TABLE IF EXISTS {table_name}") - # close asyncpg connection - await conn.close() - # cleanup Connector object - await connector.close_async() +@pytest.fixture(name="pool") +async def setup() -> AsyncGenerator: + pool = await init_connection_pool() + async with pool.connect() as conn: + await conn.execute( + sqlalchemy.text( + f"CREATE TABLE IF NOT EXISTS {table_name}" + " ( id CHAR(20) NOT NULL, title TEXT NOT NULL );" + ) + ) + + yield pool + + async with pool.connect() as conn: + await conn.execute(sqlalchemy.text(f"DROP TABLE IF EXISTS {table_name}")) + # dispose of asyncpg connection pool + await pool.dispose() @pytest.mark.asyncio -async def test_connection_with_asyncpg(conn: asyncpg.Connection) -> None: - await conn.execute( - f"INSERT INTO {table_name} (id, title) VALUES ('book1', 'Book One')" - ) - await conn.execute( - f"INSERT INTO {table_name} (id, title) VALUES ('book2', 'Book Two')" +async def test_connection_with_asyncpg(pool: AsyncEngine) -> None: + insert_stmt = sqlalchemy.text( + f"INSERT INTO {table_name} (id, title) VALUES (:id, :title)", ) + async with pool.connect() as conn: + await conn.execute(insert_stmt, parameters={"id": "book1", "title": "Book One"}) + await conn.execute(insert_stmt, parameters={"id": "book2", "title": "Book Two"}) - rows = await conn.fetch(f"SELECT title FROM {table_name} ORDER BY ID") - titles = [row[0] for row in rows] + select_stmt = sqlalchemy.text(f"SELECT title FROM {table_name} ORDER BY ID;") + rows = (await conn.execute(select_stmt)).fetchall() + titles = [row[0] for row in rows] assert titles == ["Book One", "Book Two"] diff --git a/tests/system/test_asyncpg_iam_auth.py b/tests/system/test_asyncpg_iam_auth.py index fbb73f9a..5cda850b 100644 --- a/tests/system/test_asyncpg_iam_auth.py +++ b/tests/system/test_asyncpg_iam_auth.py @@ -13,53 +13,76 @@ See the License for the specific language governing permissions and limitations under the License. """ +import asyncio import os from typing import AsyncGenerator import uuid import asyncpg import pytest +import sqlalchemy +from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine -from google.cloud.sql.connector import create_async_connector +from google.cloud.sql.connector import Connector table_name = f"books_{uuid.uuid4().hex}" -@pytest.fixture(name="conn") -async def setup() -> AsyncGenerator: - # initialize Cloud SQL Python Connector object - connector = await create_async_connector() - conn: asyncpg.Connection = await connector.connect_async( - os.environ["POSTGRES_IAM_CONNECTION_NAME"], - "asyncpg", - user=os.environ["POSTGRES_IAM_USER"], - db=os.environ["POSTGRES_DB"], - enable_iam_auth=True, - ) - await conn.execute( - f"CREATE TABLE IF NOT EXISTS {table_name}" - " ( id CHAR(20) NOT NULL, title TEXT NOT NULL );" +# The Cloud SQL Python Connector can be used along with SQLAlchemy using the +# 'async_creator' argument to 'create_async_engine' +async def init_connection_pool() -> AsyncEngine: + async def getconn() -> asyncpg.Connection: + loop = asyncio.get_running_loop() + # initialize Connector object for connections to Cloud SQL + async with Connector(loop=loop) as connector: + conn: asyncpg.Connection = await connector.connect_async( + os.environ["POSTGRES_IAM_CONNECTION_NAME"], + "asyncpg", + user=os.environ["POSTGRES_IAM_USER"], + db=os.environ["POSTGRES_DB"], + enable_iam_auth=True, + ) + return conn + + # create SQLAlchemy connection pool + pool = create_async_engine( + "postgresql+asyncpg://", + async_creator=getconn, + execution_options={"isolation_level": "AUTOCOMMIT"}, ) + return pool - yield conn - await conn.execute(f"DROP TABLE IF EXISTS {table_name}") - # close asyncpg connection - await conn.close() - # cleanup Connector object - await connector.close_async() +@pytest.fixture(name="pool") +async def setup() -> AsyncGenerator: + pool = await init_connection_pool() + async with pool.connect() as conn: + await conn.execute( + sqlalchemy.text( + f"CREATE TABLE IF NOT EXISTS {table_name}" + " ( id CHAR(20) NOT NULL, title TEXT NOT NULL );" + ) + ) + + yield pool + + async with pool.connect() as conn: + await conn.execute(sqlalchemy.text(f"DROP TABLE IF EXISTS {table_name}")) + # dispose of asyncpg connection pool + await pool.dispose() @pytest.mark.asyncio -async def test_connection_with_asyncpg_iam_auth(conn: asyncpg.Connection) -> None: - await conn.execute( - f"INSERT INTO {table_name} (id, title) VALUES ('book1', 'Book One')" - ) - await conn.execute( - f"INSERT INTO {table_name} (id, title) VALUES ('book2', 'Book Two')" +async def test_connection_with_asyncpg_iam_auth(pool: AsyncEngine) -> None: + insert_stmt = sqlalchemy.text( + f"INSERT INTO {table_name} (id, title) VALUES (:id, :title)", ) + async with pool.connect() as conn: + await conn.execute(insert_stmt, parameters={"id": "book1", "title": "Book One"}) + await conn.execute(insert_stmt, parameters={"id": "book2", "title": "Book Two"}) - rows = await conn.fetch(f"SELECT title FROM {table_name} ORDER BY ID") - titles = [row[0] for row in rows] + select_stmt = sqlalchemy.text(f"SELECT title FROM {table_name} ORDER BY ID;") + rows = (await conn.execute(select_stmt)).fetchall() + titles = [row[0] for row in rows] assert titles == ["Book One", "Book Two"]