Skip to content

Commit

Permalink
Merge branch 'fix/example_Subscriber_test' of github.com:gofr-dev/gof…
Browse files Browse the repository at this point in the history
…r into fix/example_Subscriber_test
  • Loading branch information
Umang01-hash committed Jul 4, 2024
2 parents 44a93a0 + 4e48615 commit 7c1479c
Show file tree
Hide file tree
Showing 12 changed files with 660 additions and 97 deletions.
5 changes: 5 additions & 0 deletions docs/references/configs/page.md
Original file line number Diff line number Diff line change
Expand Up @@ -300,4 +300,9 @@ This document lists all the configuration options supported by the GoFr framewor
- MQTT_QOS
- Quality of Service Level

---

- MQTT_KEEP_ALIVE
- Sends regular messages to check the link is active. May not work as expected if handling func is blocking execution

{% /table %}
63 changes: 38 additions & 25 deletions pkg/gofr/container/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package container
import (
"strconv"
"strings"
"time"

_ "github.com/go-sql-driver/mysql" // This is required to be blank import
"gofr.dev/pkg/gofr/config"
Expand Down Expand Up @@ -113,35 +114,47 @@ func (c *Container) Create(conf config.Config) {
SubscriptionName: conf.Get("GOOGLE_SUBSCRIPTION_NAME"),
}, c.Logger, c.metricsManager)
case "MQTT":
var qos byte

port, _ := strconv.Atoi(conf.Get("MQTT_PORT"))
order, _ := strconv.ParseBool(conf.GetOrDefault("MQTT_MESSAGE_ORDER", "false"))

switch conf.Get("MQTT_QOS") {
case "1":
qos = 1
case "2":
qos = 2
default:
qos = 0
}
c.PubSub = c.createMqttPubSub(conf)
}

configs := &mqtt.Config{
Protocol: conf.GetOrDefault("MQTT_PROTOCOL", "tcp"), // using tcp as default method to connect to broker
Hostname: conf.Get("MQTT_HOST"),
Port: port,
Username: conf.Get("MQTT_USER"),
Password: conf.Get("MQTT_PASSWORD"),
ClientID: conf.Get("MQTT_CLIENT_ID_SUFFIX"),
QoS: qos,
Order: order,
}
c.File = file.New(c.Logger)
}

func (c *Container) createMqttPubSub(conf config.Config) pubsub.Client {
var qos byte

port, _ := strconv.Atoi(conf.Get("MQTT_PORT"))
order, _ := strconv.ParseBool(conf.GetOrDefault("MQTT_MESSAGE_ORDER", "false"))

c.PubSub = mqtt.New(configs, c.Logger, c.metricsManager)
keepAlive, err := time.ParseDuration(conf.Get("MQTT_KEEP_ALIVE"))
if err != nil {
keepAlive = 30 * time.Second

c.Logger.Debug("MQTT_KEEP_ALIVE is not set or ivalid, setting it to 30 seconds")
}

c.File = file.New(c.Logger)
switch conf.Get("MQTT_QOS") {
case "1":
qos = 1
case "2":
qos = 2
default:
qos = 0
}

configs := &mqtt.Config{
Protocol: conf.GetOrDefault("MQTT_PROTOCOL", "tcp"), // using tcp as default method to connect to broker
Hostname: conf.Get("MQTT_HOST"),
Port: port,
Username: conf.Get("MQTT_USER"),
Password: conf.Get("MQTT_PASSWORD"),
ClientID: conf.Get("MQTT_CLIENT_ID_SUFFIX"),
QoS: qos,
Order: order,
KeepAlive: keepAlive,
}

return mqtt.New(configs, c.Logger, c.metricsManager)
}

// GetHTTPService returns registered HTTP services.
Expand Down
3 changes: 3 additions & 0 deletions pkg/gofr/datasource/pubsub/mqtt/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ import (
"gofr.dev/pkg/gofr/datasource"
)

//go:generate go run go.uber.org/mock/mockgen -destination=mock_client.go -package=mqtt github.com/eclipse/paho.mqtt.golang Client
//go:generate go run go.uber.org/mock/mockgen -source=interface.go -destination=mock_interfaces.go -package=mqtt

type Logger interface {
Infof(format string, args ...interface{})
Debug(args ...interface{})
Expand Down
180 changes: 180 additions & 0 deletions pkg/gofr/datasource/pubsub/mqtt/mock_client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 7c1479c

Please sign in to comment.