Skip to content

Commit

Permalink
feat:Add a logging handler and insert it a few places for help debugg…
Browse files Browse the repository at this point in the history
…ing things.
  • Loading branch information
schmidtw committed Jan 9, 2025
1 parent 7827380 commit 297da02
Show file tree
Hide file tree
Showing 3 changed files with 198 additions and 15 deletions.
89 changes: 74 additions & 15 deletions cmd/xmidt-agent/wrphandlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,13 @@ import (
"github.com/xmidt-org/xmidt-agent/internal/websocket"
"github.com/xmidt-org/xmidt-agent/internal/websocket/event"
"github.com/xmidt-org/xmidt-agent/internal/wrphandlers/auth"
loghandler "github.com/xmidt-org/xmidt-agent/internal/wrphandlers/logging"
"github.com/xmidt-org/xmidt-agent/internal/wrphandlers/missing"
"github.com/xmidt-org/xmidt-agent/internal/wrphandlers/mocktr181"
"github.com/xmidt-org/xmidt-agent/internal/wrphandlers/qos"
"github.com/xmidt-org/xmidt-agent/internal/wrphandlers/xmidt_agent_crud"
"go.uber.org/fx"
"go.uber.org/zap"
)

var (
Expand All @@ -39,7 +41,8 @@ func provideWRPHandlers() fx.Option {
type wsAdapterIn struct {
fx.In

WS *websocket.Websocket
WS *websocket.Websocket
Logger *zap.Logger

// wrphandlers
AuthHandler *auth.Handler
Expand All @@ -51,27 +54,44 @@ type wsAdapterOut struct {
Cancels []func() `group:"cancels,flatten"`
}

func provideWSEventorToHandlerAdapter(in wsAdapterIn) wsAdapterOut {
func provideWSEventorToHandlerAdapter(in wsAdapterIn) (wsAdapterOut, error) {
lh, err := loghandler.New(in.AuthHandler,
in.Logger.With(
zap.String("stage", "ingress"),
zap.String("handler", "websocket")))

if err != nil {
return wsAdapterOut{}, err
}

return wsAdapterOut{
Cancels: []func(){
in.WS.AddMessageListener(
event.MsgListenerFunc(func(m wrp.Message) {
_ = in.AuthHandler.HandleWrp(m)
}),
),
}}
_ = lh.HandleWrp(m)
})),
}}, nil
}

type qosIn struct {
fx.In

QOS QOS
WS *websocket.Websocket
QOS QOS
Logger *zap.Logger
WS *websocket.Websocket
}

func provideQOSHandler(in qosIn) (*qos.Handler, error) {
lh, err := loghandler.New(in.WS,
in.Logger.With(
zap.String("stage", "egress"),
zap.String("handler", "websocket")))
if err != nil {
return nil, err
}

return qos.New(
in.WS,
lh,
qos.MaxQueueBytes(in.QOS.MaxQueueBytes),
qos.MaxMessageBytes(in.QOS.MaxMessageBytes),
qos.Priority(in.QOS.Priority),
Expand Down Expand Up @@ -109,6 +129,7 @@ type authIn struct {
// Configuration
// Note, DeviceID and PartnerID is pulled from the Identity configuration
Identity Identity
Logger *zap.Logger

// wrphandlers

Expand All @@ -117,7 +138,16 @@ type authIn struct {
}

func provideAuthHandler(in authIn) (*auth.Handler, error) {
h, err := auth.New(in.MissingHandler, in.Egress, string(in.Identity.DeviceID), in.Identity.PartnerID)

lh, err := loghandler.New(in.MissingHandler,
in.Logger.With(
zap.String("stage", "ingress"),
zap.String("handler", "authorized")))
if err != nil {
return nil, err
}

h, err := auth.New(lh, in.Egress, string(in.Identity.DeviceID), in.Identity.PartnerID)
if err != nil {
err = errors.Join(ErrWRPHandlerConfig, err)
}
Expand Down Expand Up @@ -159,6 +189,7 @@ type pubsubIn struct {
// Note, DeviceID and PartnerID is pulled from the Identity configuration
Identity Identity
Pubsub Pubsub
Logger *zap.Logger

// wrphandlers
Egress *qos.Handler
Expand All @@ -172,11 +203,19 @@ type pubsubOut struct {
}

func providePubSubHandler(in pubsubIn) (pubsubOut, error) {
var egress pubsub.CancelFunc
var cancel pubsub.CancelFunc

lh, err := loghandler.New(in.Egress,
in.Logger.With(
zap.String("stage", "egress"),
zap.String("handler", "pubsub")))
if err != nil {
return pubsubOut{}, err
}

opts := []pubsub.Option{
pubsub.WithPublishTimeout(in.Pubsub.PublishTimeout),
pubsub.WithEgressHandler(in.Egress, &egress),
pubsub.WithEgressHandler(lh, &cancel),
}
ps, err := pubsub.New(
in.Identity.DeviceID,
Expand All @@ -188,7 +227,7 @@ func providePubSubHandler(in pubsubIn) (pubsubOut, error) {

return pubsubOut{
PubSub: ps,
Cancel: egress,
Cancel: cancel,
}, err
}

Expand All @@ -199,6 +238,7 @@ type mockTr181In struct {
// Note, DeviceID and PartnerID is pulled from the Identity configuration
Identity Identity
MockTr181 MockTr181
Logger *zap.Logger

PubSub *pubsub.PubSub
}
Expand All @@ -213,16 +253,35 @@ func provideMockTr181Handler(in mockTr181In) (mockTr181Out, error) {
return mockTr181Out{}, nil
}

loggerOut, err := loghandler.New(in.PubSub,
in.Logger.With(
zap.String("stage", "egress"),
zap.String("handler", "mockTR181"),
))
if err != nil {
return mockTr181Out{}, err
}
mockDefaults := []mocktr181.Option{
mocktr181.FilePath(in.MockTr181.FilePath),
mocktr181.Enabled(in.MockTr181.Enabled),
}
mocktr181Handler, err := mocktr181.New(in.PubSub, string(in.Identity.DeviceID), mockDefaults...)
mocktr181Handler, err := mocktr181.New(loggerOut, string(in.Identity.DeviceID), mockDefaults...)
if err != nil {
return mockTr181Out{}, errors.Join(ErrWRPHandlerConfig, err)
}
if mocktr181Handler == nil {
return mockTr181Out{}, errors.Join(ErrWRPHandlerConfig, errors.New("mocktr181 handler is nil"))
}

mocktr, err := in.PubSub.SubscribeService(in.MockTr181.ServiceName, mocktr181Handler)
loggerIn, err := loghandler.New(mocktr181Handler,
in.Logger.With(
zap.String("stage", "ingress"),
zap.String("handler", "mockTR181"),
))
if err != nil {
return mockTr181Out{}, err
}
mocktr, err := in.PubSub.SubscribeService(in.MockTr181.ServiceName, loggerIn)
if err != nil {
return mockTr181Out{}, errors.Join(ErrWRPHandlerConfig, err)
}
Expand Down
61 changes: 61 additions & 0 deletions internal/wrphandlers/logging/handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// SPDX-FileCopyrightText: 2025 Comcast Cable Communications Management, LLC
// SPDX-License-Identifier: Apache-2.0

package loghandler

import (
"fmt"

"github.com/xmidt-org/wrp-go/v3"
"github.com/xmidt-org/xmidt-agent/internal/wrpkit"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)

var (
ErrInvalidInput = fmt.Errorf("invalid input")
)

// Handler logs information about the message being processed and sends the
// message to the next handler in the chain.
type Handler struct {
next wrpkit.Handler
logger *zap.Logger
level zapcore.Level
}

// New creates a new Handler. If the next handler is nil or the logger is nil,
// an error is returned. If no level is provided, DebugLevel is used.
func New(next wrpkit.Handler, logger *zap.Logger, level ...zapcore.Level) (*Handler, error) {
if next == nil || logger == nil {
return nil, ErrInvalidInput
}

level = append(level, zapcore.DebugLevel)

return &Handler{
next: next,
logger: logger,
level: level[0],
}, nil
}

// HandleWrp is called to process a message. If the next handler fails to
// process the message, a response is sent to the source of the message.
func (h Handler) HandleWrp(msg wrp.Message) error {
fields := []zap.Field{
zap.String("type", msg.Type.String()),
zap.String("source", msg.Source),
zap.String("dest", msg.Destination),
zap.Strings("partnerids", msg.PartnerIDs),
zap.Int("payload_size", len(msg.Payload)),
}

if msg.TransactionUUID != "" {
fields = append(fields, zap.String("transaction_uuid", msg.TransactionUUID))
}

h.logger.Log(h.level, "Handling message", fields...)

return h.next.HandleWrp(msg)
}
63 changes: 63 additions & 0 deletions internal/wrphandlers/logging/handler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// SPDX-FileCopyrightText: 2025 Comcast Cable Communications Management, LLC
// SPDX-License-Identifier: Apache-2.0

package loghandler

import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/xmidt-org/wrp-go/v3"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)

type MockHandler struct {
mock.Mock
}

func (m *MockHandler) HandleWrp(msg wrp.Message) error {
args := m.Called(msg)
return args.Error(0)
}

func TestNew(t *testing.T) {
logger := zap.NewNop()
next := &MockHandler{}

handler, err := New(next, logger)
assert.NoError(t, err)
assert.NotNil(t, handler)
assert.Equal(t, zapcore.DebugLevel, handler.level)

handler, err = New(nil, logger)
assert.Error(t, err)
assert.Nil(t, handler)

handler, err = New(next, nil)
assert.Error(t, err)
assert.Nil(t, handler)
}

func TestHandleWrp(t *testing.T) {
logger, _ := zap.NewDevelopment()
next := &MockHandler{}
handler, err := New(next, logger)
assert.NoError(t, err)
assert.NotNil(t, handler)

msg := wrp.Message{
Type: wrp.SimpleRequestResponseMessageType,
Source: "source",
Destination: "destination",
Payload: []byte("payload"),
TransactionUUID: "uuid",
}

next.On("HandleWrp", msg).Return(nil)

err = handler.HandleWrp(msg)
assert.NoError(t, err)
next.AssertExpectations(t)
}

0 comments on commit 297da02

Please sign in to comment.