From 46cb41614e241ef9f7257d19e12b51b702fd87b9 Mon Sep 17 00:00:00 2001 From: Pedro Tanaka Date: Tue, 17 Dec 2024 18:19:19 +0100 Subject: [PATCH] feat(event): Add MultiValueEvent interface and MultiObserverEvent implementation This PR introduces the first step in refactoring the event handling system to better support multiple values in a single event, which will help reduce allocations when processing events. This is part of a larger effort to improve performance and reduce memory allocations in the statsd exporter. Changes: - Add new `MultiValueEvent` interface that supports multiple values per event - Add `MultiObserverEvent` implementation for handling multiple observations - Add `ExplodableEvent` interface for backward compatibility - Add `Values()` method to existing event types - Add comprehensive tests for new interfaces and implementations This change is the foundation for future improvements that will: 1. Move explosion logic to a dedicated package 2. Update the line parser to use multi-value events 3. Modify the exporter to handle multi-value events directly 4. Eventually remove the need for event explosion The changes in this PR are backward compatible and don't affect existing functionality. Relates to #577 Signed-off-by: Pedro Tanaka --- pkg/event/event.go | 68 ++++++++++++++ pkg/event/event_test.go | 191 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 259 insertions(+) diff --git a/pkg/event/event.go b/pkg/event/event.go index d5e65cef..1765437b 100644 --- a/pkg/event/event.go +++ b/pkg/event/event.go @@ -39,6 +39,7 @@ func (c *CounterEvent) MetricName() string { return c.CMetricName } func (c *CounterEvent) Value() float64 { return c.CValue } func (c *CounterEvent) Labels() map[string]string { return c.CLabels } func (c *CounterEvent) MetricType() mapper.MetricType { return mapper.MetricTypeCounter } +func (c *CounterEvent) Values() []float64 { return []float64{c.CValue} } type GaugeEvent struct { GMetricName string @@ -51,6 +52,7 @@ func (g *GaugeEvent) MetricName() string { return g.GMetricName } func (g *GaugeEvent) Value() float64 { return g.GValue } func (g *GaugeEvent) Labels() map[string]string { return g.GLabels } func (g *GaugeEvent) MetricType() mapper.MetricType { return mapper.MetricTypeGauge } +func (g *GaugeEvent) Values() []float64 { return []float64{g.GValue} } type ObserverEvent struct { OMetricName string @@ -62,6 +64,7 @@ func (o *ObserverEvent) MetricName() string { return o.OMetricName } func (o *ObserverEvent) Value() float64 { return o.OValue } func (o *ObserverEvent) Labels() map[string]string { return o.OLabels } func (o *ObserverEvent) MetricType() mapper.MetricType { return mapper.MetricTypeObserver } +func (o *ObserverEvent) Values() []float64 { return []float64{o.OValue} } type Events []Event @@ -136,3 +139,68 @@ type UnbufferedEventHandler struct { func (ueh *UnbufferedEventHandler) Queue(events Events) { ueh.C <- events } + +// MultiValueEvent is an event that contains multiple values, it is going to replace the existing Event interface. +type MultiValueEvent interface { + MetricName() string + Value() float64 + Labels() map[string]string + MetricType() mapper.MetricType + Values() []float64 +} + +type MultiObserverEvent struct { + OMetricName string + OValues []float64 // DataDog extensions allow multiple values in a single sample + OLabels map[string]string + SampleRate float64 +} + +type ExplodableEvent interface { + Explode() []Event +} + +func (m *MultiObserverEvent) MetricName() string { return m.OMetricName } +func (m *MultiObserverEvent) Value() float64 { return m.OValues[0] } +func (m *MultiObserverEvent) Labels() map[string]string { return m.OLabels } +func (m *MultiObserverEvent) MetricType() mapper.MetricType { return mapper.MetricTypeObserver } +func (m *MultiObserverEvent) Values() []float64 { return m.OValues } + +// Explode returns a list of events that are the result of exploding the multi-value event. +// This will be used as a middle-step in the pipeline to convert multi-value events to single-value events. +// And keep the exporter code compatible with previous versions. +func (m *MultiObserverEvent) Explode() []Event { + if len(m.OValues) == 1 && m.SampleRate == 0 { + return []Event{m} + } + + events := make([]Event, 0, len(m.OValues)) + for _, value := range m.OValues { + labels := make(map[string]string, len(m.OLabels)) + for k, v := range m.OLabels { + labels[k] = v + } + + events = append(events, &ObserverEvent{ + OMetricName: m.OMetricName, + OValue: value, + OLabels: labels, + }) + } + + if m.SampleRate > 0 && m.SampleRate < 1 { + multiplier := int(1 / m.SampleRate) + multipliedEvents := make([]Event, 0, len(events)*multiplier) + for i := 0; i < multiplier; i++ { + multipliedEvents = append(multipliedEvents, events...) + } + return multipliedEvents + } + + return events +} + +var ( + _ ExplodableEvent = &MultiObserverEvent{} + _ MultiValueEvent = &MultiObserverEvent{} +) diff --git a/pkg/event/event_test.go b/pkg/event/event_test.go index 192ce29b..8c97f6c7 100644 --- a/pkg/event/event_test.go +++ b/pkg/event/event_test.go @@ -14,11 +14,13 @@ package event import ( + "reflect" "testing" "time" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/statsd_exporter/pkg/clock" + "github.com/prometheus/statsd_exporter/pkg/mapper" ) var eventsFlushed = prometheus.NewCounter( @@ -85,3 +87,192 @@ func TestEventIntervalFlush(t *testing.T) { t.Fatal("Expected 10 events in the event channel, but got", len(events)) } } + +func TestMultiValueEvent(t *testing.T) { + tests := []struct { + name string + event MultiValueEvent + wantValues []float64 + wantName string + wantType mapper.MetricType + wantLabels map[string]string + }{ + { + name: "MultiObserverEvent with single value", + event: &MultiObserverEvent{ + OMetricName: "test_metric", + OValues: []float64{1.0}, + OLabels: map[string]string{"label": "value"}, + SampleRate: 0, + }, + wantValues: []float64{1.0}, + wantName: "test_metric", + wantType: mapper.MetricTypeObserver, + wantLabels: map[string]string{"label": "value"}, + }, + { + name: "MultiObserverEvent with multiple values", + event: &MultiObserverEvent{ + OMetricName: "test_metric", + OValues: []float64{1.0, 2.0, 3.0}, + OLabels: map[string]string{"label": "value"}, + SampleRate: 0.5, + }, + wantValues: []float64{1.0, 2.0, 3.0}, + wantName: "test_metric", + wantType: mapper.MetricTypeObserver, + wantLabels: map[string]string{"label": "value"}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := tt.event.Values(); !reflect.DeepEqual(got, tt.wantValues) { + t.Errorf("MultiValueEvent.Values() = %v, want %v", got, tt.wantValues) + } + if got := tt.event.MetricName(); got != tt.wantName { + t.Errorf("MultiValueEvent.MetricName() = %v, want %v", got, tt.wantName) + } + if got := tt.event.MetricType(); got != tt.wantType { + t.Errorf("MultiValueEvent.MetricType() = %v, want %v", got, tt.wantType) + } + if got := tt.event.Labels(); !reflect.DeepEqual(got, tt.wantLabels) { + t.Errorf("MultiValueEvent.Labels() = %v, want %v", got, tt.wantLabels) + } + if got := tt.event.Value(); got != tt.wantValues[0] { + t.Errorf("MultiValueEvent.Value() = %v, want %v", got, tt.wantValues[0]) + } + }) + } +} + +func TestMultiObserverEvent_Explode(t *testing.T) { + tests := []struct { + name string + event *MultiObserverEvent + wantEvents []Event + }{ + { + name: "single value no sampling", + event: &MultiObserverEvent{ + OMetricName: "test_metric", + OValues: []float64{1.0}, + OLabels: map[string]string{"label": "value"}, + SampleRate: 0, + }, + wantEvents: []Event{ + &MultiObserverEvent{ + OMetricName: "test_metric", + OValues: []float64{1.0}, + OLabels: map[string]string{"label": "value"}, + SampleRate: 0, + }, + }, + }, + { + name: "multiple values no sampling", + event: &MultiObserverEvent{ + OMetricName: "test_metric", + OValues: []float64{1.0, 2.0, 3.0}, + OLabels: map[string]string{"label": "value"}, + SampleRate: 0, + }, + wantEvents: []Event{ + &ObserverEvent{ + OMetricName: "test_metric", + OValue: 1.0, + OLabels: map[string]string{"label": "value"}, + }, + &ObserverEvent{ + OMetricName: "test_metric", + OValue: 2.0, + OLabels: map[string]string{"label": "value"}, + }, + &ObserverEvent{ + OMetricName: "test_metric", + OValue: 3.0, + OLabels: map[string]string{"label": "value"}, + }, + }, + }, + { + name: "multiple values with sampling", + event: &MultiObserverEvent{ + OMetricName: "test_metric", + OValues: []float64{1.0, 2.0}, + OLabels: map[string]string{"label": "value"}, + SampleRate: 0.5, + }, + wantEvents: []Event{ + &ObserverEvent{ + OMetricName: "test_metric", + OValue: 1.0, + OLabels: map[string]string{"label": "value"}, + }, + &ObserverEvent{ + OMetricName: "test_metric", + OValue: 2.0, + OLabels: map[string]string{"label": "value"}, + }, + &ObserverEvent{ + OMetricName: "test_metric", + OValue: 1.0, + OLabels: map[string]string{"label": "value"}, + }, + &ObserverEvent{ + OMetricName: "test_metric", + OValue: 2.0, + OLabels: map[string]string{"label": "value"}, + }, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := tt.event.Explode() + if !reflect.DeepEqual(got, tt.wantEvents) { + t.Errorf("MultiObserverEvent.Explode() = %v, want %v", got, tt.wantEvents) + } + }) + } +} + +func TestEventImplementations(t *testing.T) { + tests := []struct { + name string + event interface{} + }{ + { + name: "MultiObserverEvent implements MultiValueEvent", + event: &MultiObserverEvent{}, + }, + { + name: "MultiObserverEvent implements ExplodableEvent", + event: &MultiObserverEvent{}, + }, + { + name: "MultiObserverEvent implements Event", + event: &MultiObserverEvent{}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + switch tt.name { + case "MultiObserverEvent implements MultiValueEvent": + if _, ok := tt.event.(MultiValueEvent); !ok { + t.Error("MultiObserverEvent does not implement MultiValueEvent") + } + case "MultiObserverEvent implements ExplodableEvent": + if _, ok := tt.event.(ExplodableEvent); !ok { + t.Error("MultiObserverEvent does not implement ExplodableEvent") + } + case "MultiObserverEvent implements Event": + if _, ok := tt.event.(Event); !ok { + t.Error("MultiObserverEvent does not implement Event") + } + } + }) + } +}