Skip to content

Commit

Permalink
docs: document SQLAlchemy async connection pooling (#758)
Browse files Browse the repository at this point in the history
  • Loading branch information
jackwotherspoon authored Jun 12, 2023
1 parent aaa343f commit bb1c72a
Show file tree
Hide file tree
Showing 4 changed files with 175 additions and 114 deletions.
125 changes: 70 additions & 55 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -430,48 +430,55 @@ Once a `Connector` object is returned by `create_async_connector` you can call
its `connect_async` method, just as you would the `connect` method:

```python
import asyncio
import asyncpg
from google.cloud.sql.connector import create_async_connector

import sqlalchemy
from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine

async def main():
# intialize Connector object using 'create_async_connector'
connector = await create_async_connector()
from google.cloud.sql.connector import Connector, create_async_connector

# create connection to Cloud SQL database
conn: asyncpg.Connection = await connector.connect_async(
"project:region:instance", # Cloud SQL instance connection name
"asyncpg",
user="my-user",
password="my-password",
db="my-db-name"
# ... additional database driver args
)
async def init_connection_pool(connector: Connector) -> AsyncEngine:
# initialize Connector object for connections to Cloud SQL
async def getconn() -> asyncpg.Connection:
conn: asyncpg.Connection = await connector.connect_async(
"project:region:instance", # Cloud SQL instance connection name
"asyncpg",
user="my-user",
password="my-password",
db="my-db-name"
# ... additional database driver args
)
return conn

# insert into Cloud SQL database (example)
await conn.execute("INSERT INTO ratings (title, genre, rating) VALUES ('Batman', 'Action', 8.2)")
# The Cloud SQL Python Connector can be used along with SQLAlchemy using the
# 'async_creator' argument to 'create_async_engine'
pool = create_async_engine(
"postgresql+asyncpg://",
async_creator=getconn,
)
return pool

# query Cloud SQL database (example)
results = await conn.fetch("SELECT * from ratings")
async def main():
# initialize Connector object for connections to Cloud SQL
connector = await create_async_connector()

# ... do something with results
for row in results:
print(row)
# initialize connection pool
pool = await init_connection_pool(connector)

# close asyncpg connection
await conn.close()
# example query
async with pool.connect() as conn:
await conn.execute(sqlalchemy.text("SELECT NOW()"))

# close Cloud SQL Connector
# close Connector
await connector.close_async()


# Test connection with `asyncio`
asyncio.run(main())
# dispose of connection pool
await pool.dispose()
```

For more details on interacting with an `asyncpg.Connection`, please visit
the [official documentation](https://magicstack.github.io/asyncpg/current/api/index.html).
For more details on additional database arguments with an `asyncpg.Connection`
, please visit the
[official documentation](https://magicstack.github.io/asyncpg/current/api/index.html).

### Async Context Manager

Expand All @@ -485,44 +492,52 @@ passed in as the `loop` argument to `Connector()`.
```python
import asyncio
import asyncpg

import sqlalchemy
from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine

from google.cloud.sql.connector import Connector

async def init_connection_pool(connector: Connector) -> AsyncEngine:
# initialize Connector object for connections to Cloud SQL
async def getconn() -> asyncpg.Connection:
conn: asyncpg.Connection = await connector.connect_async(
"project:region:instance", # Cloud SQL instance connection name
"asyncpg",
user="my-user",
password="my-password",
db="my-db-name"
# ... additional database driver args
)
return conn

# The Cloud SQL Python Connector can be used along with SQLAlchemy using the
# 'async_creator' argument to 'create_async_engine'
pool = create_async_engine(
"postgresql+asyncpg://",
async_creator=getconn,
)
return pool

async def main():
# get current running event loop to be used with Connector
# initialize Connector object for connections to Cloud SQL
loop = asyncio.get_running_loop()
# intialize Connector object as async context manager
async with Connector(loop=loop) as connector:
# initialize connection pool
pool = await init_connection_pool(connector)

# create connection to Cloud SQL database
conn: asyncpg.Connection = await connector.connect_async(
"project:region:instance", # Cloud SQL instance connection name
"asyncpg",
user="my-user",
password="my-password",
db="my-db-name"
# ... additional database driver args
)

# insert into Cloud SQL database (example)
await conn.execute("INSERT INTO ratings (title, genre, rating) VALUES ('Batman', 'Action', 8.2)")

# query Cloud SQL database (example)
results = await conn.fetch("SELECT * from ratings")

# ... do something with results
for row in results:
print(row)
# example query
async with pool.connect() as conn:
await conn.execute(sqlalchemy.text("SELECT NOW()"))

# close asyncpg connection
await conn.close()

# Test connection with `asyncio`
asyncio.run(main())
# dispose of connection pool
await pool.dispose()
```

## Support policy

### Major version lifecycle

This project uses [semantic versioning](https://semver.org/), and uses the
following lifecycle regarding support for a major version:

Expand Down
2 changes: 1 addition & 1 deletion requirements-test.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ pytest==7.3.1
mock==5.0.2
pytest-cov==4.1.0
pytest-asyncio==0.21.0
SQLAlchemy==2.0.15
SQLAlchemy==2.0.16
sqlalchemy-pytds==0.3.5
flake8==5.0.4
flake8-annotations==2.9.1
Expand Down
81 changes: 52 additions & 29 deletions tests/system/test_asyncpg_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,53 +13,76 @@
See the License for the specific language governing permissions and
limitations under the License.
"""
import asyncio
import os
from typing import AsyncGenerator
import uuid

import asyncpg
import pytest
import sqlalchemy
from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine

from google.cloud.sql.connector import create_async_connector
from google.cloud.sql.connector import Connector

table_name = f"books_{uuid.uuid4().hex}"


@pytest.fixture(name="conn")
async def setup() -> AsyncGenerator:
# initialize Cloud SQL Python Connector object
connector = await create_async_connector()
conn: asyncpg.Connection = await connector.connect_async(
os.environ["POSTGRES_CONNECTION_NAME"],
"asyncpg",
user=os.environ["POSTGRES_USER"],
password=os.environ["POSTGRES_PASS"],
db=os.environ["POSTGRES_DB"],
)
await conn.execute(
f"CREATE TABLE IF NOT EXISTS {table_name}"
" ( id CHAR(20) NOT NULL, title TEXT NOT NULL );"
# The Cloud SQL Python Connector can be used along with SQLAlchemy using the
# 'async_creator' argument to 'create_async_engine'
async def init_connection_pool() -> AsyncEngine:
async def getconn() -> asyncpg.Connection:
loop = asyncio.get_running_loop()
# initialize Connector object for connections to Cloud SQL
async with Connector(loop=loop) as connector:
conn: asyncpg.Connection = await connector.connect_async(
os.environ["POSTGRES_CONNECTION_NAME"],
"asyncpg",
user=os.environ["POSTGRES_USER"],
password=os.environ["POSTGRES_PASS"],
db=os.environ["POSTGRES_DB"],
)
return conn

# create SQLAlchemy connection pool
pool = create_async_engine(
"postgresql+asyncpg://",
async_creator=getconn,
execution_options={"isolation_level": "AUTOCOMMIT"},
)
return pool

yield conn

await conn.execute(f"DROP TABLE IF EXISTS {table_name}")
# close asyncpg connection
await conn.close()
# cleanup Connector object
await connector.close_async()
@pytest.fixture(name="pool")
async def setup() -> AsyncGenerator:
pool = await init_connection_pool()
async with pool.connect() as conn:
await conn.execute(
sqlalchemy.text(
f"CREATE TABLE IF NOT EXISTS {table_name}"
" ( id CHAR(20) NOT NULL, title TEXT NOT NULL );"
)
)

yield pool

async with pool.connect() as conn:
await conn.execute(sqlalchemy.text(f"DROP TABLE IF EXISTS {table_name}"))
# dispose of asyncpg connection pool
await pool.dispose()


@pytest.mark.asyncio
async def test_connection_with_asyncpg(conn: asyncpg.Connection) -> None:
await conn.execute(
f"INSERT INTO {table_name} (id, title) VALUES ('book1', 'Book One')"
)
await conn.execute(
f"INSERT INTO {table_name} (id, title) VALUES ('book2', 'Book Two')"
async def test_connection_with_asyncpg(pool: AsyncEngine) -> None:
insert_stmt = sqlalchemy.text(
f"INSERT INTO {table_name} (id, title) VALUES (:id, :title)",
)
async with pool.connect() as conn:
await conn.execute(insert_stmt, parameters={"id": "book1", "title": "Book One"})
await conn.execute(insert_stmt, parameters={"id": "book2", "title": "Book Two"})

rows = await conn.fetch(f"SELECT title FROM {table_name} ORDER BY ID")
titles = [row[0] for row in rows]
select_stmt = sqlalchemy.text(f"SELECT title FROM {table_name} ORDER BY ID;")
rows = (await conn.execute(select_stmt)).fetchall()
titles = [row[0] for row in rows]

assert titles == ["Book One", "Book Two"]
81 changes: 52 additions & 29 deletions tests/system/test_asyncpg_iam_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,53 +13,76 @@
See the License for the specific language governing permissions and
limitations under the License.
"""
import asyncio
import os
from typing import AsyncGenerator
import uuid

import asyncpg
import pytest
import sqlalchemy
from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine

from google.cloud.sql.connector import create_async_connector
from google.cloud.sql.connector import Connector

table_name = f"books_{uuid.uuid4().hex}"


@pytest.fixture(name="conn")
async def setup() -> AsyncGenerator:
# initialize Cloud SQL Python Connector object
connector = await create_async_connector()
conn: asyncpg.Connection = await connector.connect_async(
os.environ["POSTGRES_IAM_CONNECTION_NAME"],
"asyncpg",
user=os.environ["POSTGRES_IAM_USER"],
db=os.environ["POSTGRES_DB"],
enable_iam_auth=True,
)
await conn.execute(
f"CREATE TABLE IF NOT EXISTS {table_name}"
" ( id CHAR(20) NOT NULL, title TEXT NOT NULL );"
# The Cloud SQL Python Connector can be used along with SQLAlchemy using the
# 'async_creator' argument to 'create_async_engine'
async def init_connection_pool() -> AsyncEngine:
async def getconn() -> asyncpg.Connection:
loop = asyncio.get_running_loop()
# initialize Connector object for connections to Cloud SQL
async with Connector(loop=loop) as connector:
conn: asyncpg.Connection = await connector.connect_async(
os.environ["POSTGRES_IAM_CONNECTION_NAME"],
"asyncpg",
user=os.environ["POSTGRES_IAM_USER"],
db=os.environ["POSTGRES_DB"],
enable_iam_auth=True,
)
return conn

# create SQLAlchemy connection pool
pool = create_async_engine(
"postgresql+asyncpg://",
async_creator=getconn,
execution_options={"isolation_level": "AUTOCOMMIT"},
)
return pool

yield conn

await conn.execute(f"DROP TABLE IF EXISTS {table_name}")
# close asyncpg connection
await conn.close()
# cleanup Connector object
await connector.close_async()
@pytest.fixture(name="pool")
async def setup() -> AsyncGenerator:
pool = await init_connection_pool()
async with pool.connect() as conn:
await conn.execute(
sqlalchemy.text(
f"CREATE TABLE IF NOT EXISTS {table_name}"
" ( id CHAR(20) NOT NULL, title TEXT NOT NULL );"
)
)

yield pool

async with pool.connect() as conn:
await conn.execute(sqlalchemy.text(f"DROP TABLE IF EXISTS {table_name}"))
# dispose of asyncpg connection pool
await pool.dispose()


@pytest.mark.asyncio
async def test_connection_with_asyncpg_iam_auth(conn: asyncpg.Connection) -> None:
await conn.execute(
f"INSERT INTO {table_name} (id, title) VALUES ('book1', 'Book One')"
)
await conn.execute(
f"INSERT INTO {table_name} (id, title) VALUES ('book2', 'Book Two')"
async def test_connection_with_asyncpg_iam_auth(pool: AsyncEngine) -> None:
insert_stmt = sqlalchemy.text(
f"INSERT INTO {table_name} (id, title) VALUES (:id, :title)",
)
async with pool.connect() as conn:
await conn.execute(insert_stmt, parameters={"id": "book1", "title": "Book One"})
await conn.execute(insert_stmt, parameters={"id": "book2", "title": "Book Two"})

rows = await conn.fetch(f"SELECT title FROM {table_name} ORDER BY ID")
titles = [row[0] for row in rows]
select_stmt = sqlalchemy.text(f"SELECT title FROM {table_name} ORDER BY ID;")
rows = (await conn.execute(select_stmt)).fetchall()
titles = [row[0] for row in rows]

assert titles == ["Book One", "Book Two"]

0 comments on commit bb1c72a

Please sign in to comment.