Skip to content

Commit

Permalink
Fix persistent session in mqtt_consumer (influxdata#6236)
Browse files Browse the repository at this point in the history
  • Loading branch information
danielnelson authored and Mathieu Lecarme committed Apr 17, 2020
1 parent de6210a commit fcf5b1e
Show file tree
Hide file tree
Showing 3 changed files with 307 additions and 142 deletions.
39 changes: 21 additions & 18 deletions plugins/inputs/mqtt_consumer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,20 @@
The [MQTT][mqtt] consumer plugin reads from the specified MQTT topics
and creates metrics using one of the supported [input data formats][].

### Configuration:
### Configuration

```toml
[[inputs.mqtt_consumer]]
## MQTT broker URLs to be used. The format should be scheme://host:port,
## schema can be tcp, ssl, or ws.
servers = ["tcp://localhost:1883"]
servers = ["tcp://127.0.0.1:1883"]

## Topics that will be subscribed to.
topics = [
"telegraf/host01/cpu",
"telegraf/+/mem",
"sensors/#",
]

## QoS policy for messages
## 0 = at most once
Expand All @@ -18,10 +25,10 @@ and creates metrics using one of the supported [input data formats][].
##
## When using a QoS of 1 or 2, you should enable persistent_session to allow
## resuming unacknowledged messages.
qos = 0
# qos = 0

## Connection timeout for initial connection in seconds
connection_timeout = "30s"
# connection_timeout = "30s"

## Maximum messages to read from the broker that have not been written by an
## output. For best throughput set based on the number of metrics within
Expand All @@ -33,21 +40,17 @@ and creates metrics using one of the supported [input data formats][].
## waiting until the next flush_interval.
# max_undelivered_messages = 1000

## Topics to subscribe to
topics = [
"telegraf/host01/cpu",
"telegraf/+/mem",
"sensors/#",
]
## Persistent session disables clearing of the client session on connection.
## In order for this option to work you must also set client_id to identity
## the client. To receive messages that arrived while the client is offline,
## also set the qos option to 1 or 2 and don't forget to also set the QoS when
## publishing.
# persistent_session = false

# if true, messages that can't be delivered while the subscriber is offline
# will be delivered when it comes back (such as on service restart).
# NOTE: if true, client_id MUST be set
persistent_session = false
# If empty, a random client ID will be generated.
client_id = ""
## If unset, a random client ID will be generated.
# client_id = ""

## username and password to connect MQTT server.
## Username and password to connect MQTT server.
# username = "telegraf"
# password = "metricsmetricsmetricsmetrics"

Expand All @@ -65,7 +68,7 @@ and creates metrics using one of the supported [input data formats][].
data_format = "influx"
```

### Tags:
### Metrics

- All measurements are tagged with the incoming topic, ie
`topic=telegraf/host01/cpu`
Expand Down
141 changes: 92 additions & 49 deletions plugins/inputs/mqtt_consumer/mqtt_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,15 @@ const (
Connected
)

type Client interface {
Connect() mqtt.Token
SubscribeMultiple(filters map[string]byte, callback mqtt.MessageHandler) mqtt.Token
AddRoute(topic string, callback mqtt.MessageHandler)
Disconnect(quiesce uint)
}

type ClientFactory func(o *mqtt.ClientOptions) Client

type MQTTConsumer struct {
Servers []string
Topics []string
Expand All @@ -51,12 +60,13 @@ type MQTTConsumer struct {
ClientID string `toml:"client_id"`
tls.ClientConfig

client mqtt.Client
acc telegraf.TrackingAccumulator
state ConnectionState
subscribed bool
sem semaphore
messages map[telegraf.TrackingID]bool
clientFactory ClientFactory
client Client
opts *mqtt.ClientOptions
acc telegraf.TrackingAccumulator
state ConnectionState
sem semaphore
messages map[telegraf.TrackingID]bool

ctx context.Context
cancel context.CancelFunc
Expand All @@ -65,7 +75,14 @@ type MQTTConsumer struct {
var sampleConfig = `
## MQTT broker URLs to be used. The format should be scheme://host:port,
## schema can be tcp, ssl, or ws.
servers = ["tcp://localhost:1883"]
servers = ["tcp://127.0.0.1:1883"]
## Topics that will be subscribed to.
topics = [
"telegraf/host01/cpu",
"telegraf/+/mem",
"sensors/#",
]
## QoS policy for messages
## 0 = at most once
Expand All @@ -74,10 +91,10 @@ var sampleConfig = `
##
## When using a QoS of 1 or 2, you should enable persistent_session to allow
## resuming unacknowledged messages.
qos = 0
# qos = 0
## Connection timeout for initial connection in seconds
connection_timeout = "30s"
# connection_timeout = "30s"
## Maximum messages to read from the broker that have not been written by an
## output. For best throughput set based on the number of metrics within
Expand All @@ -89,21 +106,17 @@ var sampleConfig = `
## waiting until the next flush_interval.
# max_undelivered_messages = 1000
## Topics to subscribe to
topics = [
"telegraf/host01/cpu",
"telegraf/+/mem",
"sensors/#",
]
## Persistent session disables clearing of the client session on connection.
## In order for this option to work you must also set client_id to identity
## the client. To receive messages that arrived while the client is offline,
## also set the qos option to 1 or 2 and don't forget to also set the QoS when
## publishing.
# persistent_session = false
# if true, messages that can't be delivered while the subscriber is offline
# will be delivered when it comes back (such as on service restart).
# NOTE: if true, client_id MUST be set
persistent_session = false
# If empty, a random client ID will be generated.
client_id = ""
## If unset, a random client ID will be generated.
# client_id = ""
## username and password to connect MQTT server.
## Username and password to connect MQTT server.
# username = "telegraf"
# password = "metricsmetricsmetricsmetrics"
Expand Down Expand Up @@ -133,7 +146,7 @@ func (m *MQTTConsumer) SetParser(parser parsers.Parser) {
m.parser = parser
}

func (m *MQTTConsumer) Start(acc telegraf.Accumulator) error {
func (m *MQTTConsumer) Init() error {
m.state = Disconnected

if m.PersistentSession && m.ClientID == "" {
Expand All @@ -148,23 +161,41 @@ func (m *MQTTConsumer) Start(acc telegraf.Accumulator) error {
return fmt.Errorf("connection_timeout must be greater than 1s: %s", m.ConnectionTimeout.Duration)
}

m.acc = acc.WithTracking(m.MaxUndeliveredMessages)
m.ctx, m.cancel = context.WithCancel(context.Background())

opts, err := m.createOpts()
if err != nil {
return err
}

m.client = mqtt.NewClient(opts)
m.opts = opts

return nil
}

func (m *MQTTConsumer) Start(acc telegraf.Accumulator) error {
m.state = Disconnected

m.acc = acc.WithTracking(m.MaxUndeliveredMessages)
m.ctx, m.cancel = context.WithCancel(context.Background())

m.client = m.clientFactory(m.opts)

// AddRoute sets up the function for handling messages. These need to be
// added in case we find a persistent session containing subscriptions so we
// know where to dispatch presisted and new messages to. In the alternate
// case that we need to create the subscriptions these will be replaced.
for _, topic := range m.Topics {
m.client.AddRoute(topic, m.recvMessage)
}

m.state = Connecting
m.connect()

return nil
}

func (m *MQTTConsumer) connect() error {
if token := m.client.Connect(); token.Wait() && token.Error() != nil {
token := m.client.Connect()
if token.Wait() && token.Error() != nil {
err := token.Error()
m.state = Disconnected
return err
Expand All @@ -175,22 +206,26 @@ func (m *MQTTConsumer) connect() error {
m.sem = make(semaphore, m.MaxUndeliveredMessages)
m.messages = make(map[telegraf.TrackingID]bool)

// Only subscribe on first connection when using persistent sessions. On
// subsequent connections the subscriptions should be stored in the
// session, but the proper way to do this is to check the connection
// response to ensure a session was found.
if !m.PersistentSession || !m.subscribed {
topics := make(map[string]byte)
for _, topic := range m.Topics {
topics[topic] = byte(m.QoS)
}
subscribeToken := m.client.SubscribeMultiple(topics, m.recvMessage)
subscribeToken.Wait()
if subscribeToken.Error() != nil {
m.acc.AddError(fmt.Errorf("subscription error: topics: %s: %v",
strings.Join(m.Topics[:], ","), subscribeToken.Error()))
}
m.subscribed = true
// Presistent sessions should skip subscription if a session is present, as
// the subscriptions are stored by the server.
type sessionPresent interface {
SessionPresent() bool
}
if t, ok := token.(sessionPresent); ok && t.SessionPresent() {
log.Printf("D! [inputs.mqtt_consumer] Session found %v", m.Servers)
return nil
}

topics := make(map[string]byte)
for _, topic := range m.Topics {
topics[topic] = byte(m.QoS)
}

subscribeToken := m.client.SubscribeMultiple(topics, m.recvMessage)
subscribeToken.Wait()
if subscribeToken.Error() != nil {
m.acc.AddError(fmt.Errorf("subscription error: topics: %s: %v",
strings.Join(m.Topics[:], ","), subscribeToken.Error()))
}

return nil
Expand Down Expand Up @@ -316,12 +351,20 @@ func (m *MQTTConsumer) createOpts() (*mqtt.ClientOptions, error) {
return opts, nil
}

func New(factory ClientFactory) *MQTTConsumer {
return &MQTTConsumer{
Servers: []string{"tcp://127.0.0.1:1883"},
ConnectionTimeout: defaultConnectionTimeout,
MaxUndeliveredMessages: defaultMaxUndeliveredMessages,
clientFactory: factory,
state: Disconnected,
}
}

func init() {
inputs.Add("mqtt_consumer", func() telegraf.Input {
return &MQTTConsumer{
ConnectionTimeout: defaultConnectionTimeout,
MaxUndeliveredMessages: defaultMaxUndeliveredMessages,
state: Disconnected,
}
return New(func(o *mqtt.ClientOptions) Client {
return mqtt.NewClient(o)
})
})
}
Loading

0 comments on commit fcf5b1e

Please sign in to comment.