Skip to content

Commit

Permalink
refactor!: Rework code for refactored MessageBus Configuration
Browse files Browse the repository at this point in the history
BREAKING CHANGE: MessageQueue renamed to MessageBus and fields changed. See v3 Migration guide.

Signed-off-by: Leonard Goodell <leonard.goodell@intel.com>
  • Loading branch information
Leonard Goodell committed Jan 11, 2023
1 parent 8025d41 commit ebb4d57
Show file tree
Hide file tree
Showing 6 changed files with 22 additions and 25 deletions.
10 changes: 5 additions & 5 deletions example/cmd/device-simple/res/configuration.toml
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,17 @@ Type = "consul"
Host = "localhost"
Port = 59881

[MessageQueue]
[MessageBus]
Protocol = "redis"
Host = "localhost"
Port = 6379
Type = "redis"
AuthMode = "usernamepassword" # required for redis messagebus (secure or insecure).
SecretName = "redisdb"
PublishTopicPrefix = "edgex/events/device" # /<device-profile-name>/<device-name>/<source-name> will be added to this Publish Topic prefix
[MessageBus.Topics]
PublishTopicPrefix = "edgex/events/device" # /<device-profile-name>/<device-name>/<source-name> will be added to this Publish Topic prefix
CommandRequestTopic = "edgex/device/command/request/device-simple/#" # subscribing for inbound command requests
CommandResponseTopicPrefix = "edgex/device/command/response" # publishing outbound command responses; <device-service>/<device-name>/<command-name>/<method> will be added to this publish topic prefix
[MessageQueue.Optional]
# Default MQTT Specific options that need to be here to enable environment variable overrides of them
# Client Identifiers
Expand All @@ -82,9 +85,6 @@ PublishTopicPrefix = "edgex/events/device" # /<device-profile-name>/<device-name
AutoReconnect = "true"
ConnectTimeout = "5" # Seconds
SkipCertVerify = "false" # Only used if Cert/Key file or Cert/Key PEMblock are specified
[MessageQueue.Topics]
CommandRequestTopic = "edgex/device/command/request/device-simple/#" # subscribing for inbound command requests
CommandResponseTopicPrefix = "edgex/device/command/response" # publishing outbound command responses; <device-service>/<device-name>/<command-name>/<method> will be added to this publish topic prefix

# Example SecretStore configuration.
# Only used when EDGEX_SECURITY_SECRET_STORE=true
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ go 1.18
require (
bitbucket.org/bertimus9/systemstat v0.5.0
github.com/OneOfOne/xxhash v1.2.8
github.com/edgexfoundry/go-mod-bootstrap/v3 v3.0.0-dev.5
github.com/edgexfoundry/go-mod-bootstrap/v3 v3.0.0-dev.7
github.com/edgexfoundry/go-mod-core-contracts/v3 v3.0.0-dev.2
github.com/edgexfoundry/go-mod-messaging/v3 v3.0.0-dev.2
github.com/edgexfoundry/go-mod-messaging/v3 v3.0.0-dev.3
github.com/edgexfoundry/go-mod-registry/v3 v3.0.0-dev.3
github.com/google/uuid v1.3.0
github.com/gorilla/mux v1.8.0
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,14 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/eclipse/paho.mqtt.golang v1.4.2 h1:66wOzfUHSSI1zamx7jR6yMEI5EuHnT1G6rNA5PM12m4=
github.com/eclipse/paho.mqtt.golang v1.4.2/go.mod h1:JGt0RsEwEX+Xa/agj90YJ9d9DH2b7upDZMK9HRbFvCA=
github.com/edgexfoundry/go-mod-bootstrap/v3 v3.0.0-dev.5 h1:3WMWQ0oi++KFrau/e8BOTqgzORCa3G7bLG0w/wO72Io=
github.com/edgexfoundry/go-mod-bootstrap/v3 v3.0.0-dev.5/go.mod h1:cGXMUtbbzw+npJpMcFHPlXIN+ZPF71aiimhJ6v8kaSc=
github.com/edgexfoundry/go-mod-bootstrap/v3 v3.0.0-dev.7 h1:e0H7V7aFl9vC06ZUUQUzNpjqTQWs/XvdV/EKmk8zF/E=
github.com/edgexfoundry/go-mod-bootstrap/v3 v3.0.0-dev.7/go.mod h1:yW5dumg9IyfrHx4NYC5Ii0WVVB5OjTXQ/8/NkHHif5Q=
github.com/edgexfoundry/go-mod-configuration/v3 v3.0.0-dev.2 h1:xp5MsP+qf/fuJxy8fT7k1N+c4j4C6w04qMCBXm6id7o=
github.com/edgexfoundry/go-mod-configuration/v3 v3.0.0-dev.2/go.mod h1:1Vv4uWAo6r7k6jUlqVJW8JOL6YKVBc6sRL8Al3DrMck=
github.com/edgexfoundry/go-mod-core-contracts/v3 v3.0.0-dev.2 h1:tleTxhbBISfDNn596rU71n+GOy27dMIme+v8Vl0uhpw=
github.com/edgexfoundry/go-mod-core-contracts/v3 v3.0.0-dev.2/go.mod h1:7RwSq896VqelvSU7zYKs2tpZhgELVFECkiGf6XGLKfQ=
github.com/edgexfoundry/go-mod-messaging/v3 v3.0.0-dev.2 h1:bNXJHfxTo/1SzQbiJQUnySUGVP5i2FNwFsXQ+RzqR8Q=
github.com/edgexfoundry/go-mod-messaging/v3 v3.0.0-dev.2/go.mod h1:G0Vxoc8+JXwUqRH5ggyOZ/f/CIVPTswI5Ld7dI5uhIY=
github.com/edgexfoundry/go-mod-messaging/v3 v3.0.0-dev.3 h1:+el2HxEt02uFXXBmHK8gWETPklNbPg4wvYln/4/ooHo=
github.com/edgexfoundry/go-mod-messaging/v3 v3.0.0-dev.3/go.mod h1:G0Vxoc8+JXwUqRH5ggyOZ/f/CIVPTswI5Ld7dI5uhIY=
github.com/edgexfoundry/go-mod-registry/v3 v3.0.0-dev.3 h1:QgZF9f70Cwpvkjw3tP1aiVGHc+yNFJNzW6hO8pDs3fg=
github.com/edgexfoundry/go-mod-registry/v3 v3.0.0-dev.3/go.mod h1:2w8v0sv+i21nY+DY6JV4PFxsNTuxpGAjlNFlFMTfZkk=
github.com/edgexfoundry/go-mod-secrets/v3 v3.0.0-dev.1 h1:dib+mZUuHqwVHt9pKAWC4lh60Fbc+6vKrD919LaknwI=
Expand Down
4 changes: 3 additions & 1 deletion internal/common/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/edgexfoundry/device-sdk-go/v3/internal/container"
bootstrapInterfaces "github.com/edgexfoundry/go-mod-bootstrap/v3/bootstrap/interfaces"
"github.com/edgexfoundry/go-mod-bootstrap/v3/config"
gometrics "github.com/rcrowley/go-metrics"

bootstrapContainer "github.com/edgexfoundry/go-mod-bootstrap/v3/bootstrap/container"
Expand Down Expand Up @@ -85,7 +86,8 @@ func SendEvent(event *dtos.Event, correlationID string, dic *di.Container) {
mc := bootstrapContainer.MessagingClientFrom(dic.Get)
ctx = context.WithValue(ctx, common.ContentType, encoding) // nolint: staticcheck
envelope := types.NewMessageEnvelope(bytes, ctx)
publishTopic := fmt.Sprintf("%s/%s/%s/%s", configuration.MessageQueue.PublishTopicPrefix, event.ProfileName, event.DeviceName, event.SourceName)
prefix := configuration.MessageBus.Topics[config.MessageBusPublishTopicPrefix]
publishTopic := fmt.Sprintf("%s/%s/%s/%s", prefix, event.ProfileName, event.DeviceName, event.SourceName)
err = mc.Publish(envelope, publishTopic)
if err != nil {
lc.Errorf("Failed to publish event to MessageBus: %s", err)
Expand Down
19 changes: 7 additions & 12 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ type ConfigurationStruct struct {
Driver map[string]string
// SecretStore contains information for connecting to the secure SecretStore (Vault) to retrieve or store secrets
SecretStore bootstrapConfig.SecretStoreInfo
// MessageQueue contains information for connecting to MessageBus which provides alternative way to publish event
MessageQueue bootstrapConfig.MessageBusInfo
// MessageBus contains information for connecting to MessageBus which provides alternative way to publish event
MessageBus bootstrapConfig.MessageBusInfo
// MaxEventSize is the maximum event size that can be sent to MessageBus or CoreData
MaxEventSize int64
}
Expand Down Expand Up @@ -68,11 +68,11 @@ func (c *ConfigurationStruct) UpdateWritableFromRaw(rawWritable interface{}) boo
// into an bootstrapConfig.BootstrapConfiguration struct contained within ConfigurationStruct).
func (c *ConfigurationStruct) GetBootstrap() bootstrapConfig.BootstrapConfiguration {
return bootstrapConfig.BootstrapConfiguration{
Clients: c.Clients,
Service: c.Service,
Registry: c.Registry,
SecretStore: c.SecretStore,
MessageQueue: c.MessageQueue,
Clients: c.Clients,
Service: c.Service,
Registry: c.Registry,
SecretStore: c.SecretStore,
MessageBus: c.MessageBus,
}
}

Expand All @@ -91,11 +91,6 @@ func (c *ConfigurationStruct) GetInsecureSecrets() bootstrapConfig.InsecureSecre
return c.Writable.InsecureSecrets
}

// GetMessageBusInfo returns the MessageBus configuration
func (c *ConfigurationStruct) GetMessageBusInfo() bootstrapConfig.MessageBusInfo {
return c.MessageQueue
}

// GetTelemetryInfo returns the service's Telemetry settings.
func (c *ConfigurationStruct) GetTelemetryInfo() *bootstrapConfig.TelemetryInfo {
return &c.Writable.Telemetry
Expand Down
2 changes: 1 addition & 1 deletion internal/controller/messaging/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ const (

func SubscribeCommands(ctx context.Context, dic *di.Container) errors.EdgeX {
lc := bootstrapContainer.LoggingClientFrom(dic.Get)
messageBusInfo := container.ConfigurationFrom(dic.Get).MessageQueue
messageBusInfo := container.ConfigurationFrom(dic.Get).MessageBus
requestTopic := messageBusInfo.Topics[CommandRequestTopic]
responseTopicPrefix := messageBusInfo.Topics[CommandResponseTopicPrefix]

Expand Down

0 comments on commit ebb4d57

Please sign in to comment.