Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

consume very slow when Inflights to many #340

Closed
VinsonWood opened this issue Dec 7, 2023 · 5 comments
Closed

consume very slow when Inflights to many #340

VinsonWood opened this issue Dec 7, 2023 · 5 comments
Labels
bug Something isn't working discussion Something to be discussed

Comments

@VinsonWood
Copy link

VinsonWood commented Dec 7, 2023

Hello, I am using Mochi to test offline messages. I have two clients subscribing to the same topic with a QoS of 1, and both have clean session turned off. This topic continuously receives QoS 1 data. When one of the clients disconnects, the Inflight for that client keeps accumulating, causing a continuous increase in memory usage. When the memory consumption reaches around 300-400 MB, the other connected client can only consume messages at an extremely slow rate, far below the sending rate of this topic. If the disconnected client reconnects, the Inflight and memory will be released, and memory consumption will reduce. At this point, the consumption rate will return to normal.
I know that I can use a hook to limit the size of the Inflight and, after the client disconnection, I can check the size of the client's Inflight to clear it. Are there any other elegant solutions available?

@VinsonWood VinsonWood changed the title consumer very slow when Inflights to many consume very slow when Inflights to many Dec 7, 2023
@thedevop
Copy link
Collaborator

thedevop commented Dec 13, 2023

@mochi-co , currently we store all QoS > 0 messages. Even if receive quota is exceeded:

server/server.go

Lines 971 to 993 in 624dde0

if out.FixedHeader.Qos > 0 {
i, err := cl.NextPacketID() // [MQTT-4.3.2-1] [MQTT-4.3.3-1]
if err != nil {
s.hooks.OnPacketIDExhausted(cl, pk)
s.Log.Warn("packet ids exhausted", "error", err, "client", cl.ID, "listener", cl.Net.Listener)
return out, packets.ErrQuotaExceeded
}
out.PacketID = uint16(i) // [MQTT-2.2.1-4]
sentQuota := atomic.LoadInt32(&cl.State.Inflight.sendQuota)
if ok := cl.State.Inflight.Set(out); ok { // [MQTT-4.3.2-3] [MQTT-4.3.3-3]
atomic.AddInt64(&s.Info.Inflight, 1)
s.hooks.OnQosPublish(cl, out, out.Created, 0)
cl.State.Inflight.DecreaseSendQuota()
}
if sentQuota == 0 && atomic.LoadInt32(&cl.State.Inflight.maximumSendQuota) > 0 {
out.Expiry = -1
cl.State.Inflight.Set(out)
return out, nil
}
}

With Expiry = -1, it also appears the message will never be removed from Inflight. This will also make cl.NextPacketID very slow and eventually fail.

Perhaps we can make the following changes:

  • If QoS > 0, immediately check if the client quota has reached
    • If reached, drop message for this client and call OnQosDropped hook
    • If not, store the message

@thedevop thedevop added bug Something isn't working discussion Something to be discussed labels Dec 13, 2023
@thedevop
Copy link
Collaborator

thedevop commented Jan 14, 2024

@mochi-co , PR #359 adds the ability to config maximum QoS > 0 message to store, which also means none of the clients send quota will exceed this value.

With this PR, if message to be stored is > MaximumInflight, s.Info.InflightDropped is incremented, and logged, should we call OnQosDropped (but the message was never stored, so OnQosPublish was never called)?

@thedevop
Copy link
Collaborator

thedevop commented Feb 6, 2024

@VinsonWood , you can now config maximum inflight messages (Capabilities.MaximumInflight) per client in v2.4.6. Please let us know if it solves your issue.

@VinsonWood
Copy link
Author

@thedevop
Thank you! let me try it.

@VinsonWood
Copy link
Author

@thedevop it works,thank you !

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working discussion Something to be discussed
Projects
None yet
Development

No branches or pull requests

3 participants