Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kotel: optionally only use messaging.kafka.connects.count for connection metrics #691

Merged
merged 4 commits into from
Jun 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 51 additions & 7 deletions plugin/kotel/meter.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ type Meter struct {
provider metric.MeterProvider
meter metric.Meter
instruments instruments

mergeConnectsMeter bool
}

// MeterOpt interface used for setting optional config properties.
Expand All @@ -52,6 +54,20 @@ func MeterProvider(provider metric.MeterProvider) MeterOpt {
})
}

// WithMergedConnectsMeter merges the `messaging.kafka.connect_errors.count`
// counter into the `messaging.kafka.connects.count` counter, adding an
// attribute "outcome" with the values "success" or "failure". This option
// shall be used when a single metric with different dimensions is preferred
// over two separate metrics that produce data at alternating intervals.
// For example, it becomes possible to alert on the metric no longer
// producing data.
func WithMergedConnectsMeter() MeterOpt {
return meterOptFunc(func(m *Meter) {
m.mergeConnectsMeter = true
})

}

func (o meterOptFunc) apply(m *Meter) {
o(m)
}
Expand Down Expand Up @@ -105,13 +121,17 @@ func (m *Meter) newInstruments() instruments {
log.Printf("failed to create connects instrument, %v", err)
}

connectErrs, err := m.meter.Int64Counter(
"messaging.kafka.connect_errors.count",
metric.WithUnit(dimensionless),
metric.WithDescription("Total number of connection errors, by broker"),
)
if err != nil {
log.Printf("failed to create connectErrs instrument, %v", err)
var connectErrs metric.Int64Counter
if !m.mergeConnectsMeter {
var err error
connectErrs, err = m.meter.Int64Counter(
"messaging.kafka.connect_errors.count",
metric.WithUnit(dimensionless),
metric.WithDescription("Total number of connection errors, by broker"),
)
if err != nil {
log.Printf("failed to create connectErrs instrument, %v", err)
}
}

disconnects, err := m.meter.Int64Counter(
Expand Down Expand Up @@ -232,6 +252,30 @@ func strnode(node int32) string {

func (m *Meter) OnBrokerConnect(meta kgo.BrokerMetadata, _ time.Duration, _ net.Conn, err error) {
node := strnode(meta.NodeID)

if m.mergeConnectsMeter {
if err != nil {
m.instruments.connects.Add(
context.Background(),
1,
metric.WithAttributeSet(attribute.NewSet(
attribute.String("node_id", node),
attribute.String("outcome", "failure"),
)),
)
return
}
m.instruments.connects.Add(
context.Background(),
1,
metric.WithAttributeSet(attribute.NewSet(
attribute.String("node_id", node),
attribute.String("outcome", "success"),
)),
)
return
}

attributes := attribute.NewSet(attribute.String("node_id", node))
if err != nil {
m.instruments.connectErrs.Add(
Expand Down
173 changes: 173 additions & 0 deletions plugin/kotel/meter_test.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,21 @@
package kotel

import (
"context"
"errors"
"net"
"strconv"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/twmb/franz-go/pkg/kgo"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/noop"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
semconv "go.opentelemetry.io/otel/semconv/v1.12.0"
)

Expand Down Expand Up @@ -38,3 +48,166 @@ func TestWithMeter(t *testing.T) {
})
}
}

func TestHook_OnBrokerConnect(t *testing.T) {
t.Run("success path with mergeConnectsMeter:false", func(t *testing.T) {
r := sdkmetric.NewManualReader()
mp := sdkmetric.NewMeterProvider(sdkmetric.WithReader(r))
m := NewMeter(MeterProvider(mp))

meta := kgo.BrokerMetadata{NodeID: 1}
m.OnBrokerConnect(meta, time.Second, &net.TCPConn{}, nil)

rm := metricdata.ResourceMetrics{}
if err := r.Collect(context.Background(), &rm); err != nil {
t.Errorf("unexpected error collecting metrics: %s", err)
}

want := metricdata.Metrics{
Name: "messaging.kafka.connects.count",
Description: "Total number of connections opened, by broker",
Unit: "1",
Data: metricdata.Sum[int64]{
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
DataPoints: []metricdata.DataPoint[int64]{
{
Value: 1,
Attributes: attribute.NewSet(
attribute.String("node_id", strconv.Itoa(int(meta.NodeID))),
),
},
},
},
}

if len(rm.ScopeMetrics) != 1 {
t.Errorf("expecting only 1 metrics in meter but got %d", len(rm.ScopeMetrics))
}

metricdatatest.AssertEqual(t, want, rm.ScopeMetrics[0].Metrics[0],
metricdatatest.IgnoreTimestamp(),
)
})
t.Run("failure path with mergeConnectsMeter:false", func(t *testing.T) {
r := sdkmetric.NewManualReader()
mp := sdkmetric.NewMeterProvider(sdkmetric.WithReader(r))
m := NewMeter(MeterProvider(mp))

meta := kgo.BrokerMetadata{NodeID: 1}
m.OnBrokerConnect(meta, time.Second, &net.TCPConn{}, errors.New("whatever error"))

rm := metricdata.ResourceMetrics{}
if err := r.Collect(context.Background(), &rm); err != nil {
t.Errorf("unexpected error collecting metrics: %s", err)
}

want := metricdata.Metrics{
Name: "messaging.kafka.connect_errors.count",
Description: "Total number of connection errors, by broker",
Unit: "1",
Data: metricdata.Sum[int64]{
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
DataPoints: []metricdata.DataPoint[int64]{
{
Value: 1,
Attributes: attribute.NewSet(
attribute.String("node_id", strconv.Itoa(int(meta.NodeID))),
),
},
},
},
}

if len(rm.ScopeMetrics) != 1 {
t.Errorf("expecting only 1 metrics in meter but got %d", len(rm.ScopeMetrics))
}

metricdatatest.AssertEqual(t, want, rm.ScopeMetrics[0].Metrics[0],
metricdatatest.IgnoreTimestamp(),
)
})

t.Run("success path with mergeConnectsMeter:true", func(t *testing.T) {
r := sdkmetric.NewManualReader()
mp := sdkmetric.NewMeterProvider(sdkmetric.WithReader(r))
m := NewMeter(MeterProvider(mp), WithMergedConnectsMeter())

meta := kgo.BrokerMetadata{NodeID: 1}
m.OnBrokerConnect(meta, time.Second, &net.TCPConn{}, nil)

rm := metricdata.ResourceMetrics{}
if err := r.Collect(context.Background(), &rm); err != nil {
t.Errorf("unexpected error collecting metrics: %s", err)
}

want := metricdata.Metrics{
Name: "messaging.kafka.connects.count",
Description: "Total number of connections opened, by broker",
Unit: "1",
Data: metricdata.Sum[int64]{
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
DataPoints: []metricdata.DataPoint[int64]{
{
Value: 1,
Attributes: attribute.NewSet(
attribute.String("node_id", strconv.Itoa(int(meta.NodeID))),
attribute.String("outcome", "success"),
),
},
},
},
}

if len(rm.ScopeMetrics) != 1 {
t.Errorf("expecting only 1 metrics in meter but got %d", len(rm.ScopeMetrics))
}

metricdatatest.AssertEqual(t, want, rm.ScopeMetrics[0].Metrics[0],
metricdatatest.IgnoreTimestamp(),
)
})
t.Run("failure path with mergeConnectsMeter:true", func(t *testing.T) {
r := sdkmetric.NewManualReader()
mp := sdkmetric.NewMeterProvider(sdkmetric.WithReader(r))
m := NewMeter(MeterProvider(mp), WithMergedConnectsMeter())

meta := kgo.BrokerMetadata{NodeID: 1}
m.OnBrokerConnect(meta, time.Second, &net.TCPConn{}, errors.New("whatever error"))

rm := metricdata.ResourceMetrics{}
if err := r.Collect(context.Background(), &rm); err != nil {
t.Errorf("unexpected error collecting metrics: %s", err)
}

want := metricdata.Metrics{
Name: "messaging.kafka.connects.count",
Description: "Total number of connections opened, by broker",
Unit: "1",
Data: metricdata.Sum[int64]{
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
DataPoints: []metricdata.DataPoint[int64]{
{
Value: 1,
Attributes: attribute.NewSet(
attribute.String("node_id", strconv.Itoa(int(meta.NodeID))),
attribute.String("outcome", "failure"),
),
},
},
},
}

if len(rm.ScopeMetrics) != 1 {
t.Errorf("expecting only 1 metrics in meter but got %d", len(rm.ScopeMetrics))
}

metricdatatest.AssertEqual(t, want, rm.ScopeMetrics[0].Metrics[0],
metricdatatest.IgnoreTimestamp(),
)
})

}