Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OnExpireInflights method has serious performance problems #125

Closed
wind-c opened this issue Dec 21, 2022 · 4 comments
Closed

OnExpireInflights method has serious performance problems #125

wind-c opened this issue Dec 21, 2022 · 4 comments
Assignees
Labels
enhancement New feature or request

Comments

@wind-c
Copy link
Contributor

wind-c commented Dec 21, 2022

1.The OnExpireInflights method implementation of Storage hook(bolt/badger/redis) has serious performance problems. That's because loading all Inflight messages from storage into memory for processing can cause congestion and CPU and memory to balloon.
2.The Server#clearExpiredInflights calling client.ClearInflights and then executing s.hooks.OnExpireInflights is a repeat action?

@mochi-co
Copy link
Collaborator

  1. I agree with this point, it's a good observation! I think we can avoid this by passing the packet id through the clearExpiredInflights (becomes clearExpiredInflight) and having the storage delete the message directly instead of getting all values from memory.
  2. I don't think so, as client.ClearInflights clears the in memory store and s.hooks.OnExpireInflights operates on the store. Unless I am misunderstanding?

I will make the change for (1) and release it as 2.0.8 along with the other pending changes 👍🏻

@mochi-co mochi-co added the enhancement New feature or request label Dec 21, 2022
@mochi-co mochi-co self-assigned this Dec 21, 2022
@wind-c
Copy link
Contributor Author

wind-c commented Dec 21, 2022

func (cl *Client) ClearInflights(now, maximumExpiry int64) int64 { var deleted int64 for _, tk := range cl.State.Inflight.GetAll(false) { if (tk.Expiry > 0 && tk.Expiry < now) || tk.Created+maximumExpiry < now { if ok := cl.State.Inflight.Delete(tk.PacketID); ok { cl.ops.hooks.OnQosDropped(cl, tk) atomic.AddInt64(&cl.ops.info.Inflight, -1) deleted++ } } } return deleted }

func (s *Server) clearExpiredInflights(now int64) { for _, client := range s.Clients.GetAll() { if d := client.ClearInflights(now, s.Options.Capabilities.MaximumMessageExpiryInterval); d > 0 { s.hooks.OnExpireInflights(client, now) } } }

cl.ops.hooks.OnQosDropped method has imposed removed messages from storage. s.hooks.OnExpireInflights method is also removed messages from storage.

`func (h *Hook) OnExpireInflights(cl *mqtt.Client, expiry int64) {
...

for _, row := range rows {
	...

	if d.Created < expiry || d.Created == 0 {
		err := h.db.HDel(h.ctx, h.hKey(storage.InflightKey), d.ID).Err()
		if err != nil {
			h.Log.Error().Err(err).Str("id", clientKey(cl)).Msg("failed to delete inflight message data")
		}
	}
}

}`

Does this condition(if d.Created < expiry || d.Created == 0) cause all messages to be deleted?

@mochi-co
Copy link
Collaborator

Hi @wind-c! I think we can use OnQosDropped in place of a new OnInflightExpired as expired inflights are dropped inflights. I will incorporate that into my fix to avoid implementing duplicate behaviour 👍

  1. No, this code will delete inflights with no created time - this was a safety mechanism in case someone tried to use v1 persistence or if someone uses the packages and adds inflights with no created date - they would otherwise be stuck forever.

If you want to delete all inflights, see in s.inheritClientSession:

existing.ClearInflights(math.MaxInt64, 0)

I'll try to get the inflight expiry cleaned up later this afternoon/evening 👍🏻

@mochi-co
Copy link
Collaborator

Fixed in v2.1.0 - OnExpireInflights has been removed, and onQosDropped is used in its place. Thanks again for your help, @wind-c!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

2 participants