Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AsyncWebsocketClient uses ever increasing memory with subsequent requests #709

Open
ubuddemeier opened this issue May 22, 2024 · 11 comments

Comments

@ubuddemeier
Copy link

I think the problem is that received messages are enqueued (in WebsocketBase._handler) regardless of whether the message corresponds to a request or not:

https://github.com/XRPLF/xrpl-py/blob/main/xrpl/asyncio/clients/websocket_base.py#L139

This should probably be inside of an else-clause to the preceding if-clause.

@ckeshava
Copy link
Collaborator

ckeshava commented Jun 6, 2024

Hello, thanks for pointing this out.

How did you observe the increasing memory load? Are you using any profiler?

ckeshava added a commit to ckeshava/xrpl-py that referenced this issue Jun 6, 2024
@Jbekker
Copy link

Jbekker commented Jun 13, 2024

@ckeshava ,
I'm noticing the same, it's best seen when you do a larger requests like requesting BookOffers. If you don't iterate over the client, the queue continuously grows and therefor also the memory usage.

Would it make sense to also have a limit on the amount of messages in the queue for the case when a client can't keep up?

import asyncio

from xrpl.asyncio.clients import AsyncWebsocketClient

from xrpl.models.requests import BookOffers
from xrpl.models.currencies import XRP, IssuedCurrency

async def main():
    async with AsyncWebsocketClient("wss://s1.ripple.com") as client:
                
                while True:
                    orderbook_asks_info = await client.request(
                        BookOffers(
                            ledger_index="current",
                            taker_gets=XRP(),
                            taker_pays=IssuedCurrency(
                                  currency="USD",
                                  issuer="rhub8VRN55s94qWKDv6jmDy1pUykJzF3wq"
                            ),
                            limit=500,
                        )
                    )

                    orderbook_bids_info = await client.request(
                        BookOffers(
                            ledger_index="current",
                            taker_gets=IssuedCurrency(
                                  currency="USD",
                                  issuer="rhub8VRN55s94qWKDv6jmDy1pUykJzF3wq"
                            ),
                            taker_pays=XRP(),
                            limit=500,
                        )
                    )

                    print(f"Queued items: {client._messages.qsize()}")
                    await asyncio.sleep(2)

if __name__ == "__main__":
    asyncio.run(main())

@ckeshava
Copy link
Collaborator

thanks, this is helpful @Jbekker

@ckeshava
Copy link
Collaborator

ckeshava commented Jun 14, 2024

@Jbekker
I have incorporated your test case at the tip of this branch: #713 -- many thanks, this is insightful.

Regarding placing limits on the queue size: A client might want to send in a large number of requests to a rippled node. For instance - We don't want to rate-limit access to a personal rippled process. (Hopefully with the resolution of this bug, there won't be any unforeseen bumps in compute/memory usage). Users can always set limits on this Python process through the Operating System tools.

But, I understand this approach is contrary to the message queues inside the OS network buffers. I don't have a strong opinion, I'm open to ideas.

@mvadari
Copy link
Collaborator

mvadari commented Jun 25, 2024

Regarding placing limits on the queue size: A client might want to send in a large number of requests to a rippled node. For instance - We don't want to rate-limit access to a personal rippled process.

There could be some sort of config option added.

@ckeshava
Copy link
Collaborator

Regarding placing limits on the queue size: A client might want to send in a large number of requests to a rippled node. For instance - We don't want to rate-limit access to a personal rippled process.

There could be some sort of config option added.

yeah, that's true. To determine a good default value, we might have to pull some statistics (for an upper bound on the queue size), do you have any ideas for that? I don't think the Data team currently tracks the client libraries.

Adding a config option might increase the complexity of the client libraries, but I think it's an acceptable compromise.

@ubuddemeier
Copy link
Author

Question: does the response need to go into the message queue if it corresponds to a pending request and is handled by fulfilling the future?
I did a quick fix on my end to work around the problem in this way:
# if this response corresponds to request, fulfill the Future
if "id" in response_dict and response_dict["id"] in self._open_requests:
self._open_requests[response_dict["id"]].set_result(response_dict)
else: # enqueue the response for the message queue
cast(_MESSAGES_TYPE, self._messages).put_nowait(response_dict)
Am I missing something here?

@ubuddemeier
Copy link
Author

Another problem I noticed earlier in that same file, is that _inject_request_id might cause a collision when generating a random ID with a lot of outstanding requests. It should check whether the randomly generated ID is already used.

@ckeshava
Copy link
Collaborator

ckeshava commented Jul 2, 2024

@ubuddemeier You are correct, the fix is similar: #713

Regarding the request-IDs, please refer to this discussion: #718 (comment)

For a collision to occur, the random math library would need to produce the same output within a given span of time. I think math libraries have a stronger guarantee insofar as psuedo-random number generation is concerned.

The requests are fulfilled very quickly (except for certain heavy workloads). I don't think we'll see collisions within such a short span of time

@ledhed2222
Copy link
Collaborator

hey author here. i think you're probably right about collisions with request IDs. we should be using a dictionary to determine what IDs are already in use, although i'm surprised if you've seen a collision in practice.

however when it comes to the message queue we must keep them all because this is how you are able to read all responses. why would you want to drop messages received? perhaps I don't understand.

@ckeshava
Copy link
Collaborator

hey author here. i think you're probably right about collisions with request IDs. we should be using a dictionary to determine what IDs are already in use, although i'm surprised if you've seen a collision in practice.

You are right, I have not seen collisions in my experience.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants