Skip to content

Commit

Permalink
feat!: remove UseMessageBus config in 3.0
Browse files Browse the repository at this point in the history
BREAKING CHANGE: Removed the 'Device.UseMessageBus' config, the code for
sending event via core-data REST client and the 'Clients.core-data' dependency

MessageBus is always enabled in 3.0 for sending events and receiving
system events for callbacks.

closes #1296

Signed-off-by: Chris Hung <chris@iotechsys.com>
  • Loading branch information
Chris Hung committed Feb 2, 2023
1 parent 621dc42 commit 482e5b9
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 88 deletions.
6 changes: 0 additions & 6 deletions example/cmd/device-simple/res/configuration.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,6 @@ Port = 8500
Type = "consul"

[Clients]
[Clients.core-data]
Protocol = "http"
Host = "localhost"
Port = 59880

[Clients.core-metadata]
Protocol = "http"
Host = "localhost"
Expand Down Expand Up @@ -98,7 +93,6 @@ SecretName = "redisdb"
AsyncBufferSize = 1
EnableAsyncReadings = true
Labels = []
UseMessageBus = true
[Device.Discovery]
Enabled = false
Interval = "30s"
Expand Down
40 changes: 15 additions & 25 deletions internal/common/utils.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// -*- Mode: Go; indent-tabs-mode: t -*-
//
// Copyright (C) 2017-2018 Canonical Ltd
// Copyright (C) 2018-2022 IOTech Ltd
// Copyright (C) 2018-2023 IOTech Ltd
//
// SPDX-License-Identifier: Apache-2.0

Expand All @@ -12,11 +12,12 @@ import (
"fmt"
"time"

"github.com/edgexfoundry/device-sdk-go/v3/internal/container"
bootstrapInterfaces "github.com/edgexfoundry/go-mod-bootstrap/v3/bootstrap/interfaces"
"github.com/edgexfoundry/go-mod-bootstrap/v3/config"
gometrics "github.com/rcrowley/go-metrics"

"github.com/edgexfoundry/device-sdk-go/v3/internal/container"

bootstrapContainer "github.com/edgexfoundry/go-mod-bootstrap/v3/bootstrap/container"
"github.com/edgexfoundry/go-mod-bootstrap/v3/di"
"github.com/edgexfoundry/go-mod-core-contracts/v3/clients/interfaces"
Expand Down Expand Up @@ -76,34 +77,23 @@ func SendEvent(event *dtos.Event, correlationID string, dic *di.Container) {

// Check event size in kilobytes
if configuration.MaxEventSize > 0 && int64(len(bytes)) > configuration.MaxEventSize*1024 {
lc.Error(fmt.Sprintf("event size exceed MaxEventSize(%d KB)", configuration.MaxEventSize))
lc.Errorf("event size exceed MaxEventSize(%d KB)", configuration.MaxEventSize)
return
}

sent := false

if configuration.Device.UseMessageBus {
mc := bootstrapContainer.MessagingClientFrom(dic.Get)
ctx = context.WithValue(ctx, common.ContentType, encoding) // nolint: staticcheck
envelope := types.NewMessageEnvelope(bytes, ctx)
prefix := configuration.MessageBus.Topics[config.MessageBusPublishTopicPrefix]
publishTopic := fmt.Sprintf("%s/%s/%s/%s", prefix, event.ProfileName, event.DeviceName, event.SourceName)
err = mc.Publish(envelope, publishTopic)
if err != nil {
lc.Errorf("Failed to publish event to MessageBus: %s", err)
}
lc.Debugf("Event(profileName: %s, deviceName: %s, sourceName: %s, id: %s) published to MessageBus", event.ProfileName, event.DeviceName, event.SourceName, event.Id)
sent = true
} else {
ec := bootstrapContainer.EventClientFrom(dic.Get)
_, err := ec.Add(ctx, req)
if err != nil {
lc.Errorf("Failed to push event to Coredata: %s", err)
} else {
lc.Debugf("Event(profileName: %s, deviceName: %s, sourceName: %s, id: %s) pushed to Coredata", event.ProfileName, event.DeviceName, event.SourceName, event.Id)
}
sent = true
mc := bootstrapContainer.MessagingClientFrom(dic.Get)
ctx = context.WithValue(ctx, common.ContentType, encoding) // nolint: staticcheck
envelope := types.NewMessageEnvelope(bytes, ctx)
prefix := configuration.MessageBus.Topics[config.MessageBusPublishTopicPrefix]
publishTopic := fmt.Sprintf("%s/%s/%s/%s", prefix, event.ProfileName, event.DeviceName, event.SourceName)
err = mc.Publish(envelope, publishTopic)
if err != nil {
lc.Errorf("Failed to publish event to MessageBus: %s", err)
return
}
lc.Debugf("Event(profileName: %s, deviceName: %s, sourceName: %s, id: %s) published to MessageBus", event.ProfileName, event.DeviceName, event.SourceName, event.Id)
sent = true

if sent {
eventsSent.Inc(1)
Expand Down
31 changes: 5 additions & 26 deletions internal/common/utils_test.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//
// Copyright (C) 2022 IOTech Ltd
// Copyright (C) 2022-2023 IOTech Ltd
//
// SPDX-License-Identifier: Apache-2.0

Expand All @@ -12,12 +12,10 @@ import (
bootstrapContainer "github.com/edgexfoundry/go-mod-bootstrap/v3/bootstrap/container"
"github.com/edgexfoundry/go-mod-bootstrap/v3/bootstrap/interfaces/mocks"
"github.com/edgexfoundry/go-mod-bootstrap/v3/di"
clientMocks "github.com/edgexfoundry/go-mod-core-contracts/v3/clients/interfaces/mocks"
"github.com/edgexfoundry/go-mod-core-contracts/v3/clients/logger"
mocks2 "github.com/edgexfoundry/go-mod-core-contracts/v3/clients/logger/mocks"
"github.com/edgexfoundry/go-mod-core-contracts/v3/common"
"github.com/edgexfoundry/go-mod-core-contracts/v3/dtos"
dtoCommon "github.com/edgexfoundry/go-mod-core-contracts/v3/dtos/common"
"github.com/edgexfoundry/go-mod-core-contracts/v3/dtos/requests"
msgMocks "github.com/edgexfoundry/go-mod-messaging/v3/messaging/mocks"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -63,7 +61,6 @@ func NewMockDIC() *di.Container {
}

func TestSendEvent(t *testing.T) {

event := buildEvent()
req := requests.NewAddEventRequest(event)
bytes, _, err := req.Encode()
Expand All @@ -74,57 +71,39 @@ func TestSendEvent(t *testing.T) {
name string
event *dtos.Event
maxEventSize int64
useMessageBus bool
eventTooLarge bool
}{
{"Valid, unlimited max event size", &event, 0, false, false},
{"Valid, publish to message bus", &event, int64(eventSize + 1), true, false},
{"Valid, push to core data ", &event, int64(eventSize + 1), false, false},
{"Invalid, over max event size", &event, int64(eventSize - 1), false, true},
{"Valid, unlimited max event size", &event, 0, false},
{"Valid, publish to message bus", &event, int64(eventSize + 1), false},
{"Invalid, over max event size", &event, int64(eventSize - 1), true},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
dic := NewMockDIC()
ecMock := &clientMocks.EventClient{}
ecMock.On("Add", mock.Anything, mock.Anything).Return(dtoCommon.BaseWithIdResponse{}, nil)
mcMock := &msgMocks.MessageClient{}
mcMock.On("Publish", mock.Anything, mock.Anything).Return(nil)

dic.Update(di.ServiceConstructorMap{
container.ConfigurationName: func(get di.Get) interface{} {
return &config.ConfigurationStruct{
MaxEventSize: tt.maxEventSize,
Device: config.DeviceInfo{
UseMessageBus: tt.useMessageBus,
},
}
},
bootstrapContainer.MessagingClientName: func(get di.Get) interface{} {
return mcMock
},
bootstrapContainer.EventClientName: func(get di.Get) interface{} {
return ecMock
},
})

InitializeSentMetrics(logger.NewMockClient(), dic)

SendEvent(tt.event, testUUIDString, dic)
if tt.eventTooLarge {
ecMock.AssertNumberOfCalls(t, "Add", 0)
mcMock.AssertNumberOfCalls(t, "Publish", 0)
assert.Equal(t, int64(0), eventsSent.Count())
assert.Equal(t, int64(0), readingsSent.Count())
} else if tt.useMessageBus {
ecMock.AssertNumberOfCalls(t, "Add", 0)
mcMock.AssertNumberOfCalls(t, "Publish", 1)
} else {
ecMock.AssertNumberOfCalls(t, "Add", 1)
mcMock.AssertNumberOfCalls(t, "Publish", 0)
}

if !tt.eventTooLarge {
mcMock.AssertNumberOfCalls(t, "Publish", 1)
assert.Equal(t, int64(1), eventsSent.Count())
assert.Equal(t, int64(1), readingsSent.Count())
}
Expand Down
2 changes: 0 additions & 2 deletions internal/config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,6 @@ type DeviceInfo struct {
EnableAsyncReadings bool
// Labels are properties applied to the device service to help with searching
Labels []string
// UseMessageBus indicates whether or not the Event are published directly to the MessageBus
UseMessageBus bool
}

// DiscoveryInfo is a struct which contains configuration of device auto discovery.
Expand Down
41 changes: 12 additions & 29 deletions pkg/service/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func Main(serviceName string, serviceVersion string, proto interface{}, ctx cont
httpServer.BootstrapHandler,
messageBusBootstrapHandler,
handlers.NewServiceMetrics(ds.ServiceName).BootstrapHandler, // Must be after Messaging
clientBootstrapHandler,
handlers.NewClientsBootstrap().BootstrapHandler,
autoevent.BootstrapHandler,
NewBootstrap(router).BootstrapHandler,
autodiscovery.BootstrapHandler,
Expand All @@ -93,39 +93,22 @@ func Main(serviceName string, serviceVersion string, proto interface{}, ctx cont
ds.Stop(false)
}

func clientBootstrapHandler(ctx context.Context, wg *sync.WaitGroup, startupTimer startup.Timer, dic *di.Container) bool {
configuration := container.ConfigurationFrom(dic.Get)
if configuration.Device.UseMessageBus {
delete(configuration.Clients, common.CoreDataServiceKey)
func messageBusBootstrapHandler(ctx context.Context, wg *sync.WaitGroup, startupTimer startup.Timer, dic *di.Container) bool {
if !handlers.MessagingBootstrapHandler(ctx, wg, startupTimer, dic) {
return false
}

if !handlers.NewClientsBootstrap().BootstrapHandler(ctx, wg, startupTimer, dic) {
lc := bootstrapContainer.LoggingClientFrom(dic.Get)
err := messaging.SubscribeCommands(ctx, dic)
if err != nil {
lc.Errorf("Failed to subscribe internal command request: %v", err)
return false
}

return true
}

func messageBusBootstrapHandler(ctx context.Context, wg *sync.WaitGroup, startupTimer startup.Timer, dic *di.Container) bool {
configuration := container.ConfigurationFrom(dic.Get)
if configuration.Device.UseMessageBus {
if !handlers.MessagingBootstrapHandler(ctx, wg, startupTimer, dic) {
return false
}

lc := bootstrapContainer.LoggingClientFrom(dic.Get)

err := messaging.SubscribeCommands(ctx, dic)
if err != nil {
lc.Errorf("Failed to subscribe internal command request: %v", err)
return false
}

err = messaging.DeviceCallback(ctx, dic)
if err != nil {
lc.Errorf("Failed to subscribe Metadata system event: %v", err)
return false
}
err = messaging.DeviceCallback(ctx, dic)
if err != nil {
lc.Errorf("Failed to subscribe Metadata system event: %v", err)
return false
}

return true
Expand Down

0 comments on commit 482e5b9

Please sign in to comment.