-
Notifications
You must be signed in to change notification settings - Fork 20
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
AsyncpgDriver doesn't reconnect if db is restarted #224
Comments
Thanks for pointing this out. The AsyncpgDriver doesn't currently have built-in reconnection logic. Essentially, the connection loss leads to PgQueuer stopping its listening activities, and only a full restart of PgQueuer will re-establish the listener. Your setup using FastAPI and wrapping the QueueManager in asyncio.create_task() shouldn't be the cause of this issue. The root problem here is that asyncpg doesn't automatically reconnect after a database connection drop, so PgQueuer simply stops being able to LISTEN until it gets a fresh connection. To work around this, you could implement a reconnection mechanism in your setup. For instance, catching the connection error and attempting to reconnect could help. I'm considering adding an automatic reconnection feature in AsyncpgDriver to handle situations like this more gracefully. |
When I kill the connection locally, after some time (~dequeue_timeout), the service will shut down. As a temporary solution, you might want to do something like this. It's hacky and dirty, but hopefully it can keep you moving for now. The below code is not tested, but added to help explain how I think you could work around the issue (again, for now). The helper class, PgQueuerTaskManager, accepts a factory function to create a PgQueuer instance. It ensures that the instance runs in a task and restarts it if it fails. You can use the stop method to stop the restart loop. Let me know if this works for your needs. import asyncio
from typing import Callable, Awaitable, Any
class PgQueuerTaskManager:
def __init__(self, factory: Callable[[], Awaitable[Any]]) -> None:
self.factory = factory
self.task = None
self._stop = False
async def start(self) -> None:
pg_queuer_instance = await self.factory()
self.task = asyncio.create_task(pg_queuer_instance.run())
async def restart_on_failure(self) -> None:
while not self._stop:
if self.task.done():
await self.start()
await asyncio.sleep(1)
def stop(self) -> None:
self._stop = True |
That looks good and makes sense, thanks a lot, will try tomorrow! So I'd set it up like this? Or do I need a proper asnyc factory function? pgqtm = PgQueuerTaskManager(lambda driver: QueueManager(driver))
pgqtm.restart_on_failure() I'm also thinking about implementing an HTTP liveness endpoint that queries pg_notification_queue_usage() and checks if the notification queue is 0. Do I get it right that if the NOTIFY listener is working then the notifications are always consumed right away and it's only the picking up the task from the table that might be delayed based on the executor behaviour (like |
If i where to set this up, i think i would have used fastAPIs lifespan context mananger, then on teardown call My only comment would be(regarding running pgqueuer as a task in fastAPI), if you get loads of tasks running it might affect your APIs performance. Ive never used |
Yes, I'm using This won't be a "proper" API as such, I'm using FastAPI just as an interface for Cloud Run Service to be able to manage readiness and health of the worker (and later scaling if needed). My main motivation for using PgQueuer is that Cloud Run Job has quite a slow startup time, so replacing it with an always-on worker would make it more responsive. Most of the actual tasks in our case will be waiting around for APIs, so hardly any work required in the instance itself. I expect that by using async, even a single container will be able to handle a decent amount of load. def create_app() -> FastAPI:
@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
connection = await asyncpg.connect(os.environ.get("DATABASE_URL"))
driver = AsyncpgDriver(connection)
qm = QueueManager(driver)
class WorkflowExecutor(AbstractEntrypointExecutor):
async def execute(self, job: Job, context: Context) -> None:
error = None
print(f"Executing job: {job}")
try:
await self.parameters.func(job)
except Exception as e:
print(f"Error executing job: {e}")
error = e
if error is not None:
raise error
# Setup the 'test_log' entrypoint
@qm.entrypoint("test_log", executor_factory=WorkflowExecutor)
async def process_test_log(job: Job) -> None:
print(f"Processed message: {job}")
# Run QueueManager in the background
task = asyncio.create_task(qm.run())
try:
yield
finally:
qm.shutdown.set()
await task # Wait for the QueueManager to finish its current task
await connection.close()
app = FastAPI(lifespan=lifespan)
@app.get("/health")
async def health(request: Request) -> Response:
return Response(content="OK", media_type="text/plain")
return app |
Right, so i guess you could would 'just' wrap the |
Ran the benchmark script, chilling at 10k job per second. It does its best to grab the events asap, but say there is some bad blocking code there is not much that be done with it(avoid heavy cpu bound tasks in the same thread as your consumer and use async for IO should cover most of the cases i know about). So yes, given that notifications arrive, they should be picked up fairly quickly. Will try todo some round trip latency benchmark later today. Local constraints like rps/concurrency limit will be delegated to executors(currently they live in the dispatch function), but i have not been able to refactor that part yet. While global constraints like serialized_dispatch needs to be implemented in db, thus executors do not have any control over it. xxx@localhost:xxxx> select pg_notification_queue_usage();
+-----------------------------+
| pg_notification_queue_usage |
|-----------------------------|
| 0.0 |
+-----------------------------+
SELECT 1
Time: 0.008s
xxx@localhost:xxxx> \watch 1
+-----------------------------+
| pg_notification_queue_usage |
|-----------------------------|
| 0.0 |
+-----------------------------+
SELECT 1
Time: 0.006s
Waiting for 1 seconds before repeating
+-----------------------------+
| pg_notification_queue_usage |
|-----------------------------|
| 9.5367431640625e-07 |
+-----------------------------+
SELECT 1
Time: 0.006s
Waiting for 1 seconds before repeating
+-----------------------------+
| pg_notification_queue_usage |
|-----------------------------|
| 9.5367431640625e-07 |
+-----------------------------+
SELECT 1
Time: 0.005s
Waiting for 1 seconds before repeating
+-----------------------------+
| pg_notification_queue_usage |
|-----------------------------|
| 9.5367431640625e-07 |
+-----------------------------+
SELECT 1
Time: 0.006s
Waiting for 1 seconds before repeating
+-----------------------------+
| pg_notification_queue_usage |
|-----------------------------|
| 1.9073486328125e-06 |
+-----------------------------+
SELECT 1
Time: 0.005s
Waiting for 1 seconds before repeating
+-----------------------------+
| pg_notification_queue_usage |
|-----------------------------|
| 1.9073486328125e-06 |
+-----------------------------+
SELECT 1
Time: 0.005s
Waiting for 1 seconds before repeating
+-----------------------------+
| pg_notification_queue_usage |
|-----------------------------|
| 3.814697265625e-06 |
+-----------------------------+
SELECT 1
Time: 0.006s
Waiting for 1 seconds before repeating
+-----------------------------+
| pg_notification_queue_usage |
|-----------------------------|
| 2.86102294921875e-06 |
+-----------------------------+
SELECT 1
Time: 0.005s
Waiting for 1 seconds before repeating
+-----------------------------+
| pg_notification_queue_usage |
|-----------------------------|
| 3.814697265625e-06 |
+-----------------------------+
SELECT 1
Time: 0.006s
Waiting for 1 seconds before repeating
+-----------------------------+
| pg_notification_queue_usage |
|-----------------------------|
| 4.76837158203125e-06 |
+-----------------------------+
SELECT 1
Time: 0.005s
Waiting for 1 seconds before repeating
+-----------------------------+
| pg_notification_queue_usage |
|-----------------------------|
| 3.814697265625e-06 |
+-----------------------------+
SELECT 1
Time: 0.005s
Waiting for 1 seconds before repeating
+-----------------------------+
| pg_notification_queue_usage |
|-----------------------------|
| 4.76837158203125e-06 |
+-----------------------------+ |
These numbers are from my local machine, i suspect that if pushed to cloud the dominating factor will be the network latency. My dev setup: Local Postgres in docker, w/ M1 Mac. uv run tools/benchmark.py -t10 --latency
Settings:
Timer: 10.0 seconds
Dequeue: 5
Dequeue Batch Size: 10
Enqueue: 1
Enqueue Batch Size: 10
89.7k job [00:10, 8.96k job/s]
Queue size:
0
========== Benchmark Results ==========
Number of Samples: 12287
Min Latency: 0.000377 seconds
Mean Latency: 0.000924 seconds
Median Latency: 0.000896 seconds
Max Latency: 0.011628 seconds
======================================== This PR contains the updated benchmark script if you want to give it a try on your setup. #226 |
Hi y'all. We're running into a similar issue here while trying to handle database connection drops. We're using asyncio tasks to manage the
It does the exponential backoff, but then we just continue to see this log infinitely, and the task never exits so we can restart the queue manager and pick up the connection again. Is there a way to either:
Also want to be transparent, we have not done a lot with asyncio, so I may be misunderstanding some of the primitives here. Below are some snippets for reference. Setup:async def create_queue_manager(db_url: str) -> QueueManager:
"""Factory function to create a QueueManager instance"""
connection = await psycopg.AsyncConnection.connect(conninfo=db_url, autocommit=True)
pg_driver = PsycopgDriver(connection)
return QueueManager(pg_driver)
class QueueWorker:
def __init__(self, db_url: str) -> None:
self.db_url = db_url
self.task = None
self._stop = False
# Create the factory function that will be used to initialize QueueManager
self.factory = lambda: create_queue_manager(self.db_url)
async def start(self) -> None:
"""Start the queue worker"""
qm = await self.factory()
self.task = asyncio.create_task(self._run_worker(qm))
self._restart_task = asyncio.create_task(self.restart_on_failure())
async def restart_on_failure(self) -> None:
while not self._stop:
if self.task and self.task.done():
await self.start()
await asyncio.sleep(1)
def stop(self) -> None:
self._stop = True
@staticmethod
def __setup_graceful_shutdown(qm: QueueManager) -> None:
atexit.register(qm.shutdown.set)
async def check_connection(self, qm: QueueManager) -> None:
while True:
try:
# Use QueueManager's driver to execute a simple query
await qm.connection.execute("SELECT 1")
except Exception as e:
logger.error(f"Connection check failed: {e}")
raise RuntimeError("Database connection lost")
await asyncio.sleep(1) Task handling: connection_check_task = asyncio.create_task(self.check_connection(qm))
queue_manager_task = asyncio.create_task(qm.run())
try:
await asyncio.wait(
[connection_check_task, queue_manager_task],
return_when=asyncio.FIRST_COMPLETED
)
if connection_check_task.done():
await connection_check_task
except Exception as e:
logger.error(f"Error in queue worker tasks: {e}")
raise
finally:
qm.shutdown.set()
connection_check_task.cancel()
queue_manager_task.cancel()
try:
await queue_manager_task
except asyncio.CancelledError:
pass
except Exception as e:
logger.error(f"Error while shutting down queue manager: {e}") |
Welcome and thanks for posting. Have you tired the python -m pgqueuer run <factory-function-string> --restart-on-failure This will spin up a new service as soon as it detects that the connection is down, you can adjust your 'detection time' by setting |
Thanks for the quick reply @janbjorge! I may be misunderstanding, but I don't think the cli is an option for us here since we're just running the worker queue on our server. I've tried to replicate the restart on failure code from it, and have been successful in picking up a new connection, but I still see the flush jobs log continuing. It seems that no matter what I do, it hangs there indefinitely, even if I'm successful in continuing through the queue otherwise. This might be an ok state functionality-wise, but it worries me to potentially leave it hanging there. |
Me referring to it as the CLI tool might be a bit misleading, but the If this doesn’t meet your requirements, I’d appreciate it if you could clarify why, as this scenario is one of its intended use cases. Regarding the issue you’re encountering, it seems that your code might be stuck in this loop: I’ve run into similar issues before when using the signal module. There’s a chance something similar could be happening with |
@janbjorge Hello! Just got the same issue as described above:
Tried running with the option
This works as expected only if there is no jobs in execution in the moment of failure. In this case worker automatically restarted and reconnected. However if there is at least 1 job running in the moment of time when database connection is failing - that results in infinite loop of retries which will never end. As far as I understand this is happening in method ...
async with (
buffers.JobStatusLogBuffer(
max_size=batch_size,
timeout=job_status_log_buffer_timeout,
callback=self.queries.log_jobs,
) as jbuff,
... Any ideas how to overcome this? May be some patch? |
Digging into it now, great report @AntonKarabaza. btw, do you simulate outage? If so, how? EDIT; I believe I've identified the issue. Based on my investigation, I'm 95% sure the problem is here. Lines 204 to 205 in 33ee624
When an exception occurs (e.g., a database outage), the buffer tries to flush its contents to the database before exiting. However, if the database is down, this leads to the buffer being stuck in a loop. My current idea for a solution is to add exceptions for database errors so they can be ignored. That said, I'm not entirely satisfied with this approach, and I'm open to alternative suggestions. I've created a PR that includes a test case to demonstrate the issue where it gets stuck. |
Hi @AntonKarabaza, I’ve implemented a fix for the infinite retry loop in the JobStatusLogBuffer during connection failures. Instead of ignoring database errors (which felt overly complex), I opted for a backoff strategy. This introduces delays between retries, preventing excessive flushing attempts while ensuring recovery when the connection is restored(deadlines etc.. to be determined) The PR includes this fix along with tests simulating outages. Could you help verify it in your environment, particularly with jobs in execution during failures? Please let me know if it resolves the issue or if you encounter anything unexpected. |
@janbjorge Great! Thank you for such quick response and fix! I've tested the patch and it works! As for your question about the simulation of outage - I simply have it as follows:
With the fix I've only noticed that it is actually taking longer then ...
while self.shutdown_backoff.current_delay < self.shutdown_backoff.max_limit:
... I believe it is taking longer because ...
if self.lock.locked() or helpers.utc_now() < self.next_flush:
return
... By saying "taking longer" I mean it is writing several more times this log before finishing the work:
When at first glance I thought that it will exit, when reach 10 seconds retry first time. But in any case - there is not infinite loop now, which is great. |
Hi @AntonKarabaza, Thank you for your observations. I see now that the term It might make sense to expose these backoff-related settings (like max_limit and other parameters) in the runner configuration. This way, users could fine-tune the behavior based on their specific workloads and constraints. What are your thoughts on making these settings configurable? Would exposing them help in your use case or others you can foresee? EDIT; Made a few more adjustments, any chance of testing once more @AntonKarabaza ? |
@janbjorge Hello! No problems, tested latest changes - they also work! Test outcomesThis time I've noticed that worker retrying for some time before exiting as expected, but then logging retries multiple times at once and existing after that. Please see below logs about this behaviour.
After that goes exception in
And then again multiple times at once (without actual waiting):
Than exception in
And after that worker is shutting down and restarted automatically. Probably this is cos of this part in await asyncio.sleep(
0
if self.shutdown.is_set()
else helpers.timeout_with_jitter(delay).total_seconds()
) Params exposureAs for your question about exposing these retry parameters - I definitely can see that this can be helpful in some situations. But for now in our case - I think that it is not required to have those exposed. |
@AntonKarabaza thanks for testing, the sequence you outline is what i expected to see. I want the give the system some leeway to recover before doing an restart. Thanks for testing. Regarding the parameters, i think i will leave them hard coded for now (I will do followup where i will allow adjustments by os envs.) |
I've pasted a small working example here how to make sure your worker won't get stuck with a lost connection. TL;DR is that you either need to handle |
I noticed that if I restart my local dev db (connected via Docker network, ie
postgres:5432
host)AsyncpgDriver
just silently drops the connection and PgQueuer stops listening.If I then restart PgQueuer itself it'll pick up the queued jobs and listen again.
Does it do that for everyone else or is it because I'm using a bit of a frankensetup with FastAPI wrapping QueueManager which is run using
asyncio.create_task(qm.run())
?The text was updated successfully, but these errors were encountered: