Skip to content

Commit

Permalink
fix mqtt subscription reconnect
Browse files Browse the repository at this point in the history
  • Loading branch information
Umang01-hash committed Feb 17, 2025
1 parent ad64d87 commit e77a32d
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 8 deletions.
22 changes: 16 additions & 6 deletions pkg/gofr/datasource/pubsub/mqtt/mqtt.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
const (
publicBroker = "broker.emqx.io"
messageBuffer = 10
defaultRetryTimeout = 10 * time.Second
defaultRetryTimeout = 5 * time.Second
)

var errClientNotConnected = errors.New("client not connected")
Expand Down Expand Up @@ -71,7 +71,7 @@ func New(config *Config, logger Logger, metrics Metrics) *MQTT {

logger.Debugf("connecting to MQTT at '%v:%v' with clientID '%v'", config.Hostname, config.Port, config.ClientID)

options.SetOnConnectHandler(createReconnectHandler(mu, config, subs))
options.SetOnConnectHandler(createReconnectHandler(mu, config, subs, logger))
options.SetConnectionLostHandler(createConnectionLostHandler(logger))
options.SetReconnectingHandler(createReconnectingHandler(logger, config))
// create the client using the options above
Expand All @@ -98,7 +98,9 @@ func (m *MQTT) Subscribe(ctx context.Context, topic string) (*pubsub.Message, er
token := m.Client.Subscribe(topic, m.config.QoS, subs.handler)

if token.Wait() && token.Error() != nil {
m.mu.Unlock()
m.logger.Errorf("error getting a message from MQTT, error: %v", token.Error())

return nil, token.Error()
}

Expand Down Expand Up @@ -322,6 +324,7 @@ func (m *MQTT) Ping() error {
func retryConnect(client mqtt.Client, config *Config, logger Logger, options *mqtt.ClientOptions) {
for {
token := client.Connect()

if token.Wait() && token.Error() != nil {
logger.Errorf("could not connect to MQTT at '%v:%v', error: %v", config.Hostname, config.Port, token.Error())

Expand All @@ -336,13 +339,20 @@ func retryConnect(client mqtt.Client, config *Config, logger Logger, options *mq
}
}

func createReconnectHandler(mu *sync.RWMutex, config *Config, subs map[string]subscription) func(c mqtt.Client) {
return func(c mqtt.Client) {
func createReconnectHandler(mu *sync.RWMutex, config *Config, subs map[string]subscription,
logger Logger) mqtt.OnConnectHandler {
return func(client mqtt.Client) {
// Re-subscribe to all topics after reconnecting
mu.RLock()
defer mu.RUnlock()

for k, v := range subs {
c.Subscribe(k, config.QoS, v.handler)
for topic, sub := range subs {
token := client.Subscribe(topic, config.QoS, sub.handler)
if token.Wait() && token.Error() != nil {
logger.Debugf("failed to resubscribe to topic %s: %v", topic, token.Error())
} else {
logger.Debugf("resubscribed to topic %s successfully", topic)
}
}
}
}
Expand Down
13 changes: 11 additions & 2 deletions pkg/gofr/datasource/pubsub/mqtt/mqtt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,16 @@ func TestReconnectHandler(t *testing.T) {
qos := byte(1)

clientMock := NewMockClient(ctrl)
clientMock.EXPECT().Subscribe("topic1", qos, gomock.Any()).Return(nil)
tokenMock := NewMockToken(ctrl)

clientMock.EXPECT().Subscribe("topic1", qos, gomock.Any()).Return(tokenMock)

tokenMock.EXPECT().Wait().Return(true)
tokenMock.EXPECT().Error().Return(nil)

mockLogger := NewMockLogger(ctrl)

mockLogger.EXPECT().Debugf("resubscribed to topic %s successfully", "topic1")

msgsChan := make(chan *pubsub.Message)

Expand All @@ -506,7 +515,7 @@ func TestReconnectHandler(t *testing.T) {
Port: 1234,
ClientID: "gopher",
QoS: qos,
}, subs)
}, subs, mockLogger)

assert.NotNil(t, reconnectHandler)

Expand Down

0 comments on commit e77a32d

Please sign in to comment.