Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support connection pooling for async drivers #422

Closed
jackwotherspoon opened this issue Aug 9, 2022 · 22 comments · Fixed by #758
Closed

Support connection pooling for async drivers #422

jackwotherspoon opened this issue Aug 9, 2022 · 22 comments · Fixed by #758
Assignees
Labels
priority: p1 Important issue which blocks shipping the next release. Will be fixed prior to next release. type: feature request ‘Nice-to-have’ improvement, new feature or different behavior or design.

Comments

@jackwotherspoon
Copy link
Collaborator

jackwotherspoon commented Aug 9, 2022

The Cloud SQL Python Connector recommends using our library with connection pooling (usually through SQLAlchemy library)

However, currently SQLAlchemy's create_async_engine method does not allow the use of an asynchronous creator argument #8215

This makes it very difficult to use connection pooling with the Cloud SQL Python Connector's async drivers. Currently there is a workaround that is both confusing and not practical to recommend to our users:

import asyncio
from sqlalchemy import text
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.util import await_only
from google.cloud.sql.connector import Connector

async def async_creator():
    loop = asyncio.get_running_loop()
    async with Connector(loop=loop) as connector:
        conn = await connector.connect_async(
            "project:region:instance", # Cloud SQL instance connection name"
            "asyncpg",
            user="my-db-user",
            password="my-db-password",
            db="my-db-name",
        )
        return conn

async def async_main():
    def adapted_creator():
        dbapi = engine.dialect.dbapi
        from sqlalchemy.dialects.postgresql.asyncpg import (
            AsyncAdapt_asyncpg_connection,
)
        return AsyncAdapt_asyncpg_connection(
            dbapi,
            await_only(async_creator()),
            prepared_statement_cache_size=100,
        )

    # create async connection pool with wrapped creator
    engine = create_async_engine(
        "postgresql+asyncpg://",
        echo=True,
        creator=adapted_creator,
    )
    
    # use connection pooling with Cloud SQL Python Connector
    async with engine.connect() as conn:
        query = await conn.execute(text("SELECT * from ratings"))
        results = query.fetchall()
        for row in results:
            print(row)


asyncio.run(async_main())

For this reason, we should look at the possibility of supporting and returning native connection pools from the Cloud SQL Python Connector for async drivers. (ex. asyncpg.create_pool)

This will allow users to have the benefits of connection pooling without the need for the confusing SQLAlchemy workaround.

I would suggest a connector.create_pool method or something that has the exact same interface as connector.connect_async:

    # intialize Connector object using 'create_async_connector'
    connector = await create_async_connector()

    # create connection pool to Cloud SQL database
    conn: asyncpg.Pool = await connector.create_pool(
        "project:region:instance", # Cloud SQL instance connection name
        "asyncpg",
        user="my-db-user",
        password="my-db-pass",
        db="my-db-name",
        # ... additional database driver or connection pool args 
    )

See below comment as to reasoning for not supporting native connection pooling in this library.

@jackwotherspoon jackwotherspoon added type: feature request ‘Nice-to-have’ improvement, new feature or different behavior or design. priority: p1 Important issue which blocks shipping the next release. Will be fixed prior to next release. labels Aug 9, 2022
@dbarkley
Copy link

Suggested implementation looks to be exactly what we need. Any timeline on release?

@dbarkley
Copy link

@jackwotherspoon not sure if any work has started on an implementation but I had a quick look and it seems that with a slight change to asyncpg's connect, and connect_utils.__connect_addr to make the ssl parameter callable in the same way as the password parameter is would make it rather trivial to call down to asyncpg.create_pool from the Connector. Unless more than the SSL changes on refresh that is?

If you agree I can have a go at doing the work.

Bit of extra context - We use straight up asyncpg rather than SQLAlchemy so the work around isn't suitable for us. I looked at alternative implementations without any changes to asyncpg but could only come up with an ugly task that calls set_connect_args after the expiry time should have passed.

@enocom enocom changed the title Support/document connection pooling for async drivers Support connection pooling for async drivers Mar 30, 2023
@jackwotherspoon
Copy link
Collaborator Author

Hi @dbarkley, thanks for the interest in this! 😄

I have not yet begun developing this feature. I am still trying to decide whether or not the Connector should have an interface for creating connection pools. Currently we only return database connection objects across all our drivers and don't return native connection pool objects.

Ideally, SQLAlchemy would support a creator argument for it's async interface as described above.

Is there a reason you are wanting to use straight up asyncpg pools over SQLAlchemy pools? I am curious to know if there is a performance difference between the two? I wonder if changes could be made to asyncpg itself to create pools from a connection object like SQLAlchemy does.

Let me know if you have any thoughts. Thanks so much for the interest on this. 😀

@dbarkley
Copy link

I can't speak to performance. It's just at this point we've got a lot of places where asyncpg pools are expected.

This is what I came to in terms of implementation...

For asyncpg we can make ssl callable WayhomeUK/asyncpg#1
For the connector we just call down to asyncpg.create_pool with a callable that references the instance WayhomeUK#1

Tests for the the direct_tls=True path in asyncpg are lacking and probably would be required.

@jackwotherspoon
Copy link
Collaborator Author

@dbarkley Thanks for the links to your implementation! Will take a look.

One main question... why does ssl need to be callable? Is this so that the refreshed context gets updated in the pool?

Appreciate you taking the time to look into this! Have an awesome day.

@dbarkley
Copy link

@jackwotherspoon My understanding is that asyncpg currently caches the initial context and uses it for subsequent connections and that the context has limited lifetime. I may have misunderstood the context expiry though if you think it's not required.

@dbarkley
Copy link

@jackwotherspoon have you had anymore time to consider these changes? If I'm able to get the asyncpg change pushed through how would you feel about the suggested change to the connector?

@jackwotherspoon
Copy link
Collaborator Author

@dbarkley Sorry for the delay in response! Let me chat it over once more with the team today to see whether native connection pooling is something we want to support or not in the Connector.

I will play around this weekend with looking at alternatives as well. Will get back to you on Monday with the decision 😄

@chamini2
Copy link

Eager to learn what decisions are made! We are also interested in using pooling with this library. Our code also assumes a pool is in place and our current solution is to build a new connection every time waiting for it to be natively supported.

@jackwotherspoon
Copy link
Collaborator Author

jackwotherspoon commented Apr 17, 2023

@chamini2 @dbarkley Thanks for the interest here!

After talking it over with the team we have decided that we don't want to handle the creation of native connection pools within this library and instead keep the library focused purely on creating secure connection objects. We want to leave connection pooling to libraries that will always beat us in performance (ex. asyncpg itself or pooling libraries like SQLAlchemy)

I think the proper way to go about this would maybe be for a feature request to be opened on the asyncpg library to allow for asyncpg.create_pool to take in a callable creator style argument that is used to create the connections managed by the pool natively.

That way a native asyncpg.Pool object could be created as follows:

import asyncio
import asyncpg
from google.cloud.sql.connector import Connector

async def async_creator():
    loop = asyncio.get_running_loop()
    async with Connector(loop=loop) as connector:
        conn = await connector.connect_async(
            "project:region:instance", # Cloud SQL instance connection name"
            "asyncpg",
            user="my-db-user",
            password="my-db-password",
            db="my-db-name",
        )
        return conn

async def main():
  pool = await asyncpg.create_pool(init=async_creator)
  # ... interact with connection pool

asyncio.run(main())

Curious what you think about this approach?

Let me know if this sounds like it would work for your use-case as I am happy to open up the feature request myself if on asyncpg if need be. We will keep investigating approaches to try and make connection pooling easier with our library for async database drivers. I am going to put aside some time over the next couple of weeks to try and get to a solution that works better than the current workaround with SQLAlchemy

@dbarkley
Copy link

@jackwotherspoon that all makes sense to me.

@jackwotherspoon
Copy link
Collaborator Author

@dbarkley So it turns out asyncpg may already support this feature.

The asyncpg.create_pool takes in an init arg that is defined as "A coroutine to initialize a connection when it is created". This may potentially work for the example in my above response!

I have yet to formally test it but will do so in the coming days.
https://github.com/MagicStack/asyncpg/blob/172b8f693f4b6885bec60001964fee13fc8de644/asyncpg/pool.py#L1112

@dbarkley
Copy link

@jackwotherspoon I'm hope you're right but I think that's just for setting up encoders/decodes and similar. We are already using that init param for encoding and decoding postgis geometry types for example.

@SishaarRao
Copy link

SishaarRao commented Apr 21, 2023

@dbarkley @jackwotherspoon Hello, I've been working on building a production system that utilizes the cloud-sql-python-connector library and the asyncpg Postgres driver. AFAIK the init parameter does not enable the usage of asyncpg connection pooling but with a connector-created connection. Here is an example

import asyncpg
import os
from google.cloud.sql.connector import create_async_connector, Connector
from asyncio import Lock

class _PostgresService:
  def __init__(self):
    self.port =5432
    self.database = 'postgres'
    self.pool: asyncpg.Pool = None
    self.conn = None
    self._connector: Connector = None

  async def async_creator(self, *args, **kwargs):
    return await self._connector.connect_async(
      '<My Cloud SQL Instance Name>',
      'asyncpg',
      user="<elided>",
      password="<elided>",
      db=self.database,
      port=self.port
    )

  async def connect(self):
    self._connector = await create_async_connector()

    # Create a pool, with the `async_creator` method that creates a connection
    # UNCOMMENT THIS SO THAT CONNECTING FAILS
    # self.pool = await asyncpg.create_pool(
    #   init=self.async_creator,
    #   min_size=1,
    #   max_size=2,
    # )

    # Create a single connection with the creator
    self.conn = await  self.async_creator()

  async def disconnect(self):
    if self.pool:
      await self.pool.close()
    if self._connector:
      await self._connector.close_async()

  async def get_recording_url(self, phone_number, time, limit=5):
    query = """
      SELECT COALESCE(...) AS ...
      FROM ...
      INNER JOIN ...
      WHERE c."prospectNumber" = $1
      AND COALESCE(...) IS NOT NULL
      AND ...
      AND ...
      AND ... < $2
      ORDER BY ... DESC
      LIMIT $3
    """
    # Use the connection pool to make a query
    # async with self.pool.acquire() as conn:
      # result = await conn.fetch(query, phone_number, time, limit)

    # Make a query just with the connection
    result = await self.conn.fetch(query, phone_number, time, limit)

    if result and len(result) > 0:
      return [r.get("recordingUrl", None) for r in result]
    return None

Database = _PostgresService()
await Database.connect() # This FAILS when trying to use connection pooling

I believe this fails because the create_pool's init function does not get called until after the connection has been created. (to verify this, put a print in the async_creator) But we want to actually create the connection in init. 🐔 & 🥚

Given this information, what are next steps/workarounds? I'm a bit disappointed that this hasn't already been solved, as connection pooling is a very common requirement for production-grade software. I opted to use asyncpg because I did not want an ORM, just a pure PG client, that was also async/await compatible. Should I consider a different library entirely? Or perhaps implement a custom database connection pooling thing?

@jackwotherspoon
Copy link
Collaborator Author

@SishaarRao Thanks for the insights! It indeed looks like the init is not able to work for the use case as originally hoped for.

I'm a bit disappointed that this hasn't already been solved, as connection pooling is a very common requirement for production-grade software. I opted to use asyncpg because I did not want an ORM...

I understand your disappointment. We only handle returning Connection objects by the Connector and thus rely on the driver library or pooling libraries (SQLAlchemy) to handle pooling. I can advocate for an asyncpg feature request and potentially work on a patch to the library but it is sometimes a slow process. Do you have a suggestion for how you would ideally like connection pooling to work when interacting with the Connector? Open to feedback and suggestions.

In terms of not wanting an ORM I understand, however the workaround provided in the initial issue description is not using the SQLAlchemy ORM API. SQLAlchemy provides two APIs, its ORM API and its CORE API. The workaround provided is using the Core API and not ORM. So maybe it would actually work for your use case as it uses regular query syntax and execution?

Again I appreciate the feedback and will be working over the next couple weeks to provide a better solution for asyncpg connection pooling.

Have a great day! 😄

@SishaarRao
Copy link

Hi, is there any update on this?

@jackwotherspoon
Copy link
Collaborator Author

I'm currently working on an upstream PR to SQLAlchemy to hopefully get native support for a creator type argument on the create_async_engine interface.

@chamini2
Copy link

@jackwotherspoon , and any news on sending a PR to asyncpg to add a creator argument too? I can help with that if we agree on the approach of how it should work. In the docs it seems connection_class could be an option to get it working with today's options?

@jackwotherspoon
Copy link
Collaborator Author

@chamini2 I won't be able to work on the asyncpg PR until after the SQLAlchemy one, which is still a WIP and will take a bit of time to get right.

If you want to help take an initial attempt at getting a working asyncpg PR in place that would be greatly appreciated! 😄

I think as mentioned we will want to add a creator argument that defaults to None but otherwise allows a coroutine to asyncpg.create_pool and the Pool Class. You can look at init or setup arguments as a guidance for creator.

When creator is not None its coroutine should be called in _get_new_connection instead of connection.connect().

pseudo_code:

async def _get_new_connection(self):
    if self._working_addr is None:
        if self._creator:
            con = await self._creator()
        else:
            # First connection attempt on this pool.
            con = await connection.connect(
                *self._connect_args,
                loop=self._loop,
                connection_class=self._connection_class,
                record_class=self._record_class,
                **self._connect_kwargs)
     # ...

@kausarmukadam
Copy link

Found this thread while setting up an async engine connection to cloud sql - is there a workound that is recommended for now while the PR is in progress?

@jackwotherspoon
Copy link
Collaborator Author

@kausarmukadam The workaround mentioned in the initial description of this issue works and is probably the best to use in the meantime while I wait for the PR sqlalchemy/sqlalchemy#9854 to get merged. Hoping for async_creator to make it into the next SQLAlchemy release so that we can support it here 🤞

@jackwotherspoon
Copy link
Collaborator Author

The latest version of SQLAlchemy v2.0.16 now supports an async_creator argument which enables support for asyncpg connection pooling with the Cloud SQL Python Connector.

I recommend now referring to this repos README for the recommended usage ⭐

This feature will be live tomorrow when we release a new version of the Python Connector.

@chamini2 I'm marking this bug as closed as there is now an elegant solution but feel free to raise a bug on asyncpg if you would like the driver to support native connection pooling in a similar way instead of having to use SQLAlchemy.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
priority: p1 Important issue which blocks shipping the next release. Will be fixed prior to next release. type: feature request ‘Nice-to-have’ improvement, new feature or different behavior or design.
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants