Skip to content

Commit

Permalink
Updated structs and tests.
Browse files Browse the repository at this point in the history
Signed-off-by: Frank Davidson <frank_davidson@manulife.com>
Signed-off-by: Frank Davidson <ffdavidson@gmail.com>
  • Loading branch information
= authored and davidsonff committed Apr 8, 2020
1 parent 4b3b9ba commit b51efcb
Show file tree
Hide file tree
Showing 9 changed files with 594 additions and 126 deletions.
258 changes: 256 additions & 2 deletions bridge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,20 @@
package main

import (
"fmt"
"net"
"reflect"
"testing"
"time"

"github.com/go-kit/kit/log"
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
"github.com/prometheus/statsd_exporter/pkg/clock"
"github.com/prometheus/statsd_exporter/pkg/event"
"github.com/prometheus/statsd_exporter/pkg/exporter"
"github.com/prometheus/statsd_exporter/pkg/listener"
"github.com/prometheus/statsd_exporter/pkg/mapper"
)

func TestHandlePacket(t *testing.T) {
Expand Down Expand Up @@ -417,11 +425,35 @@ func TestHandlePacket(t *testing.T) {
},
}

for k, l := range []statsDPacketHandler{&listener.StatsDUDPListener{nil, nil, log.NewNopLogger()}, &mockStatsDTCPListener{listener.StatsDTCPListener{nil, nil, log.NewNopLogger()}, log.NewNopLogger()}} {
for k, l := range []statsDPacketHandler{&listener.StatsDUDPListener{
Conn: nil,
EventHandler: nil,
Logger: log.NewNopLogger(),
UDPPackets: udpPackets,
LinesReceived: linesReceived,
EventsFlushed: eventsFlushed,
SampleErrors: *sampleErrors,
SamplesReceived: samplesReceived,
TagErrors: tagErrors,
TagsReceived: tagsReceived,
}, &mockStatsDTCPListener{listener.StatsDTCPListener{
Conn: nil,
EventHandler: nil,
Logger: log.NewNopLogger(),
LinesReceived: linesReceived,
EventsFlushed: eventsFlushed,
SampleErrors: *sampleErrors,
SamplesReceived: samplesReceived,
TagErrors: tagErrors,
TagsReceived: tagsReceived,
TCPConnections: tcpConnections,
TCPErrors: tcpErrors,
TCPLineTooLong: tcpLineTooLong,
}, log.NewNopLogger()}} {
events := make(chan event.Events, 32)
l.SetEventHandler(&event.UnbufferedEventHandler{C: events})
for i, scenario := range scenarios {
l.HandlePacket([]byte(scenario.in), udpPackets, linesReceived, eventsFlushed, *sampleErrors, samplesReceived, tagErrors, tagsReceived)
l.HandlePacket([]byte(scenario.in))

le := len(events)
// Flatten actual events.
Expand All @@ -442,3 +474,225 @@ func TestHandlePacket(t *testing.T) {
}
}
}

type statsDPacketHandler interface {
HandlePacket(packet []byte)
SetEventHandler(eh event.EventHandler)
}

type mockStatsDTCPListener struct {
listener.StatsDTCPListener
log.Logger
}

func (ml *mockStatsDTCPListener) HandlePacket(packet []byte) {
// Forcing IPv4 because the TravisCI build environment does not have IPv6
// addresses.
lc, err := net.ListenTCP("tcp4", nil)
if err != nil {
panic(fmt.Sprintf("mockStatsDTCPListener: listen failed: %v", err))
}

defer lc.Close()

go func() {
cc, err := net.DialTCP("tcp", nil, lc.Addr().(*net.TCPAddr))
if err != nil {
panic(fmt.Sprintf("mockStatsDTCPListener: dial failed: %v", err))
}

defer cc.Close()

n, err := cc.Write(packet)
if err != nil || n != len(packet) {
panic(fmt.Sprintf("mockStatsDTCPListener: write failed: %v,%d", err, n))
}
}()

sc, err := lc.AcceptTCP()
if err != nil {
panic(fmt.Sprintf("mockStatsDTCPListener: accept failed: %v", err))
}
ml.HandleConn(sc)
}

// TestTtlExpiration validates expiration of time series.
// foobar metric without mapping should expire with default ttl of 1s
// bazqux metric should expire with ttl of 2s
func TestTtlExpiration(t *testing.T) {
// Mock a time.NewTicker
tickerCh := make(chan time.Time)
clock.ClockInstance = &clock.Clock{
TickerCh: tickerCh,
}

config := `
defaults:
ttl: 1s
mappings:
- match: bazqux.*
name: bazqux
ttl: 2s
`
// Create mapper from config and start an Exporter with a synchronous channel
testMapper := &mapper.MetricMapper{}
err := testMapper.InitFromYAMLString(config, 0)
if err != nil {
t.Fatalf("Config load error: %s %s", config, err)
}
events := make(chan event.Events)
defer close(events)
go func() {
ex := exporter.NewExporter(testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
ex.Listen(events)
}()

ev := event.Events{
// event with default ttl = 1s
&event.GaugeEvent{
GMetricName: "foobar",
GValue: 200,
},
// event with ttl = 2s from a mapping
&event.TimerEvent{
TMetricName: "bazqux.main",
TValue: 42000,
},
}

var metrics []*dto.MetricFamily
var foobarValue *float64
var bazquxValue *float64

// Step 1. Send events with statsd metrics.
// Send empty Events to wait for events are handled.
// saveLabelValues will use fake instant as a lastRegisteredAt time.
clock.ClockInstance.Instant = time.Unix(0, 0)
events <- ev
events <- event.Events{}

// Check values
metrics, err = prometheus.DefaultGatherer.Gather()
if err != nil {
t.Fatal("Gather should not fail")
}
foobarValue = getFloat64(metrics, "foobar", prometheus.Labels{})
bazquxValue = getFloat64(metrics, "bazqux", prometheus.Labels{})
if foobarValue == nil || bazquxValue == nil {
t.Fatalf("Gauge `foobar` and Summary `bazqux` should be gathered")
}
if *foobarValue != 200 {
t.Fatalf("Gauge `foobar` observation %f is not expected. Should be 200", *foobarValue)
}
if *bazquxValue != 42 {
t.Fatalf("Summary `bazqux` observation %f is not expected. Should be 42", *bazquxValue)
}

// Step 2. Increase Instant to emulate metrics expiration after 1s
clock.ClockInstance.Instant = time.Unix(1, 10)
clock.ClockInstance.TickerCh <- time.Unix(0, 0)
events <- event.Events{}

// Check values
metrics, err = prometheus.DefaultGatherer.Gather()
if err != nil {
t.Fatal("Gather should not fail")
}
foobarValue = getFloat64(metrics, "foobar", prometheus.Labels{})
bazquxValue = getFloat64(metrics, "bazqux", prometheus.Labels{})
if foobarValue != nil {
t.Fatalf("Gauge `foobar` should be expired")
}
if bazquxValue == nil {
t.Fatalf("Summary `bazqux` should be gathered")
}
if *bazquxValue != 42 {
t.Fatalf("Summary `bazqux` observation %f is not expected. Should be 42", *bazquxValue)
}

// Step 3. Increase Instant to emulate metrics expiration after 2s
clock.ClockInstance.Instant = time.Unix(2, 200)
clock.ClockInstance.TickerCh <- time.Unix(0, 0)
events <- event.Events{}

// Check values
metrics, err = prometheus.DefaultGatherer.Gather()
if err != nil {
t.Fatal("Gather should not fail")
}
foobarValue = getFloat64(metrics, "foobar", prometheus.Labels{})
bazquxValue = getFloat64(metrics, "bazqux", prometheus.Labels{})
if bazquxValue != nil {
t.Fatalf("Summary `bazqux` should be expired")
}
if foobarValue != nil {
t.Fatalf("Gauge `foobar` should not be gathered after expiration")
}
}

// getFloat64 search for metric by name in array of MetricFamily and then search a value by labels.
// Method returns a value or nil if metric is not found.
func getFloat64(metrics []*dto.MetricFamily, name string, labels prometheus.Labels) *float64 {
var metricFamily *dto.MetricFamily
for _, m := range metrics {
if *m.Name == name {
metricFamily = m
break
}
}
if metricFamily == nil {
return nil
}

var metric *dto.Metric
labelStr := fmt.Sprintf("%v", labels)
for _, m := range metricFamily.Metric {
l := labelPairsAsLabels(m.GetLabel())
ls := fmt.Sprintf("%v", l)
if labelStr == ls {
metric = m
break
}
}
if metric == nil {
return nil
}

var value float64
if metric.Gauge != nil {
value = metric.Gauge.GetValue()
return &value
}
if metric.Counter != nil {
value = metric.Counter.GetValue()
return &value
}
if metric.Histogram != nil {
value = metric.Histogram.GetSampleSum()
return &value
}
if metric.Summary != nil {
value = metric.Summary.GetSampleSum()
return &value
}
if metric.Untyped != nil {
value = metric.Untyped.GetValue()
return &value
}
panic(fmt.Errorf("collected a non-gauge/counter/histogram/summary/untyped metric: %s", metric))
}

func labelPairsAsLabels(pairs []*dto.LabelPair) (labels prometheus.Labels) {
labels = prometheus.Labels{}
for _, pair := range pairs {
if pair.Name == nil {
continue
}
value := ""
if pair.Value != nil {
value = *pair.Value
}
labels[*pair.Name] = value
}
return
}
6 changes: 3 additions & 3 deletions exporter_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func benchmarkUDPListener(times int, b *testing.B) {

for i := 0; i < times; i++ {
for _, line := range bytesInput {
l.HandlePacket([]byte(line), udpPackets, linesReceived, eventsFlushed, *sampleErrors, samplesReceived, tagErrors, tagsReceived)
l.HandlePacket([]byte(line))
}
}
}
Expand Down Expand Up @@ -142,7 +142,7 @@ mappings:
b.Fatalf("Config load error: %s %s", config, err)
}

ex := exporter.NewExporter(testMapper, log.NewNopLogger())
ex := exporter.NewExporter(testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
for i := 0; i < b.N; i++ {
ec := make(chan event.Events, 1000)
go func() {
Expand All @@ -152,6 +152,6 @@ mappings:
close(ec)
}()

ex.Listen(ec, eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
ex.Listen(ec)
}
}
Loading

0 comments on commit b51efcb

Please sign in to comment.