-
Notifications
You must be signed in to change notification settings - Fork 2.5k
/
Copy pathmetrics_to_prw_v2.go
132 lines (114 loc) · 4.28 KB
/
metrics_to_prw_v2.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package prometheusremotewrite // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheusremotewrite"
import (
"errors"
"fmt"
"strconv"
"github.com/prometheus/prometheus/prompb"
writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.uber.org/multierr"
prometheustranslator "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheus"
)
// FromMetricsV2 converts pmetric.Metrics to Prometheus remote write format 2.0.
func FromMetricsV2(md pmetric.Metrics, settings Settings) (map[string]*writev2.TimeSeries, writev2.SymbolsTable, error) {
c := newPrometheusConverterV2()
errs := c.fromMetrics(md, settings)
tss := c.timeSeries()
out := make(map[string]*writev2.TimeSeries, len(tss))
for i := range tss {
out[strconv.Itoa(i)] = &tss[i]
}
return out, c.symbolTable, errs
}
// prometheusConverterV2 converts from OTLP to Prometheus write 2.0 format.
type prometheusConverterV2 struct {
// TODO handle conflicts
unique map[uint64]*writev2.TimeSeries
symbolTable writev2.SymbolsTable
}
func newPrometheusConverterV2() *prometheusConverterV2 {
return &prometheusConverterV2{
unique: map[uint64]*writev2.TimeSeries{},
symbolTable: writev2.NewSymbolTable(),
}
}
// fromMetrics converts pmetric.Metrics to Prometheus remote write format.
func (c *prometheusConverterV2) fromMetrics(md pmetric.Metrics, settings Settings) (errs error) {
resourceMetricsSlice := md.ResourceMetrics()
for i := 0; i < resourceMetricsSlice.Len(); i++ {
resourceMetrics := resourceMetricsSlice.At(i)
resource := resourceMetrics.Resource()
scopeMetricsSlice := resourceMetrics.ScopeMetrics()
// keep track of the most recent timestamp in the ResourceMetrics for
// use with the "target" info metric
var mostRecentTimestamp pcommon.Timestamp
for j := 0; j < scopeMetricsSlice.Len(); j++ {
metricSlice := scopeMetricsSlice.At(j).Metrics()
// TODO: decide if instrumentation library information should be exported as labels
for k := 0; k < metricSlice.Len(); k++ {
metric := metricSlice.At(k)
mostRecentTimestamp = max(mostRecentTimestamp, mostRecentTimestampInMetric(metric))
if !isValidAggregationTemporality(metric) {
errs = multierr.Append(errs, fmt.Errorf("invalid temporality and type combination for metric %q", metric.Name()))
continue
}
promName := prometheustranslator.BuildCompliantName(metric, settings.Namespace, settings.AddMetricSuffixes)
// handle individual metrics based on type
//exhaustive:enforce
switch metric.Type() {
case pmetric.MetricTypeGauge:
dataPoints := metric.Gauge().DataPoints()
if dataPoints.Len() == 0 {
errs = multierr.Append(errs, fmt.Errorf("empty data points. %s is dropped", metric.Name()))
break
}
c.addGaugeNumberDataPoints(dataPoints, resource, settings, promName)
case pmetric.MetricTypeSum:
// TODO implement
case pmetric.MetricTypeHistogram:
// TODO implement
case pmetric.MetricTypeExponentialHistogram:
// TODO implement
case pmetric.MetricTypeSummary:
// TODO implement
default:
errs = multierr.Append(errs, errors.New("unsupported metric type"))
}
}
}
// TODO implement
// addResourceTargetInfov2(resource, settings, mostRecentTimestamp, c)
}
return
}
// timeSeries returns a slice of the writev2.TimeSeries that were converted from OTel format.
func (c *prometheusConverterV2) timeSeries() []writev2.TimeSeries {
allTS := make([]writev2.TimeSeries, 0, len(c.unique))
for _, ts := range c.unique {
allTS = append(allTS, *ts)
}
return allTS
}
func (c *prometheusConverterV2) addSample(sample *writev2.Sample, lbls []prompb.Label) *writev2.TimeSeries {
if sample == nil || len(lbls) == 0 {
// This shouldn't happen
return nil
}
buf := make([]uint32, 0, len(lbls)*2)
var off uint32
for _, l := range lbls {
off = c.symbolTable.Symbolize(l.Name)
buf = append(buf, off)
off = c.symbolTable.Symbolize(l.Value)
buf = append(buf, off)
}
ts := writev2.TimeSeries{
LabelsRefs: buf,
Samples: []writev2.Sample{*sample},
}
c.unique[timeSeriesSignature(lbls)] = &ts
return &ts
}