-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Pub/Sub API Redesign #3859
Pub/Sub API Redesign #3859
Conversation
I really want to see a green build; #3860 merged and trying again. |
THE BUILD IS GREEN! Here we go! |
This might have broken AppEngine deployments: #3863 |
Thank you for this work to make the Python Pub/Sub client more performant. I would like to suggest that you update the main documentation with more information about how the asynchronous nature of the client is actually implemented. It's clear after reading the source that you are creating and managing your own background executor, but I think that information is something that users would like to understand. I also think it would be useful for a user to have some information about how this could be combined with an |
Hi @robhaswell!
I am totally willing to consider this. :-) I tried not to go too deeply into implementation details because I was concerned that it would be distracting to people who just need to know how to use it. Can you explain to me what the use case you have is that you are trying to go figure out, so that I can understand my audience? I could totally see a case for a "Advanced: Bring your own concurrency" type of page that explains how to do that stuff that I spent so much time making sure was pluggable. :-) Basically -- happy to oblige, but need some guidance as to what it is you need. In the meantime, I sincerely hope the in-code documentation is being helpful, and I would appreciate feedback if it is not. |
Thanks @lukesneeringer!
Sure, the program I'm working on is rather simple. Its purpose is to read objects from our primary database (Postgres), and then write denormalised representations of them to our frontend database (Elasticsearch), which is a process we called indexing. Object UUIDs to be indexed come out of a Pub/Sub queue which is fed by the program which got these objects into Postgres in the first place. Now, it takes quite a long time (comparatively) to build these Elastic representations but Postgres can sustain a very high read concurrency, so we pump an import asyncio
from .private.indexer import process_to_elastic
from .private.pubsub import Subscription
CONCURRENCY = 8
queue = asyncio.Queue(maxsize=10)
async def work_producer():
while True:
messages = Subscription.pull_many()
for message in messages:
await queue.put(message.data)
async def index_worker():
while True:
work = await queue.get()
await process_to_elastic(work)
async def begin():
await asyncio.gather(*((work_producer(),) + (index_worker() for _ in range(CONCURRENCY))))
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(begin())
loop.close() As a side note, I think the implementation currently - "we provide a producer that you attach a callback to" - would be suboptimal for us. Our indexing process can lock up for a variety of reasons so we need to be able to apply backpressure to producing messages from the Pub/Sub queue, otherwise memory could grow unbounded and we risk flooding some poor database with millions of requests in the same instant. I hope this helps! Oh and the in-code documentation has served me fine so far, thanks. |
It does help. A few comments and thoughts: Flow Control
We sort of have what you want here. Subscriptions have something called "flow control", which is basically a maximum number of unacked messages that they will work on. You could set the max number of messages pretty low and the client library will stop reading messages from Pub/Sub until they are dealt with. (It will not error, though.) I think it would be mostly equivalent to what you have here to simply set the max messages to 10. asyncio
something something something envy that you get to write Python 3 only code. 😠 😛 I think this should be possible with a subclass of |
Thanks for getting back to me.
Yes, maximum unacked messages has the same effect as our implementation. Sorry, I didn't spot that in the documentation, perhaps it is supposed to be detailed at http://google-cloud-python.readthedocs.io/en/latest/pubsub/subscriber/index.html which is blank for me (I should probably file that... somewhere).
Yeah it's pretty nice 😄 I mean I think GCP would benefit from an |
Hi @lukesneeringer, I made an example of how one would mix this client with an Hope this helps. |
This PR represents an entirely redesigned Pub/Sub library, and is the culmination of #3463 (publisher), #3637 (subscriber and tests), and #3849 (documentation). Merging this PR will merge the rewritten library into master.
This is the final sanity check for a significant change (the library is rewritten from scratch). Once approved, we need to do a final version number bump and then a launch.