-
Notifications
You must be signed in to change notification settings - Fork 1.5k
/
Copy pathmetrics.go
130 lines (112 loc) · 3.93 KB
/
metrics.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package fanoutconsumer // import "go.opentelemetry.io/collector/internal/fanoutconsumer"
import (
"context"
"fmt"
"go.uber.org/multierr"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/connector"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/pmetric"
)
// NewMetrics wraps multiple metrics consumers in a single one.
// It fanouts the incoming data to all the consumers, and does smart routing:
// - Clones only to the consumer that needs to mutate the data.
// - If all consumers needs to mutate the data one will get the original mutable data.
func NewMetrics(mcs []consumer.Metrics) consumer.Metrics {
// Don't wrap if there is only one non-mutating consumer.
if len(mcs) == 1 && !mcs[0].Capabilities().MutatesData {
return mcs[0]
}
mc := &metricsConsumer{}
for i := 0; i < len(mcs); i++ {
if mcs[i].Capabilities().MutatesData {
mc.mutable = append(mc.mutable, mcs[i])
} else {
mc.readonly = append(mc.readonly, mcs[i])
}
}
return mc
}
type metricsConsumer struct {
mutable []consumer.Metrics
readonly []consumer.Metrics
}
func (msc *metricsConsumer) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: false}
}
// ConsumeMetrics exports the pmetric.Metrics to all consumers wrapped by the current one.
func (msc *metricsConsumer) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error {
var errs error
if len(msc.mutable) > 0 {
// Clone the data before sending to all mutating consumers except the last one.
for i := 0; i < len(msc.mutable)-1; i++ {
errs = multierr.Append(errs, msc.mutable[i].ConsumeMetrics(ctx, cloneMetrics(md)))
}
// Send data as is to the last mutating consumer only if there are no other non-mutating consumers and the
// data is mutable. Never share the same data between a mutating and a non-mutating consumer since the
// non-mutating consumer may process data async and the mutating consumer may change the data before that.
lastConsumer := msc.mutable[len(msc.mutable)-1]
if len(msc.readonly) == 0 && !md.IsReadOnly() {
errs = multierr.Append(errs, lastConsumer.ConsumeMetrics(ctx, md))
} else {
errs = multierr.Append(errs, lastConsumer.ConsumeMetrics(ctx, cloneMetrics(md)))
}
}
// Mark the data as read-only if it will be sent to more than one read-only consumer.
if len(msc.readonly) > 1 && !md.IsReadOnly() {
md.MarkReadOnly()
}
for _, mc := range msc.readonly {
errs = multierr.Append(errs, mc.ConsumeMetrics(ctx, md))
}
return errs
}
func cloneMetrics(md pmetric.Metrics) pmetric.Metrics {
clonedMetrics := pmetric.NewMetrics()
md.CopyTo(clonedMetrics)
return clonedMetrics
}
var _ connector.MetricsRouter = (*metricsRouter)(nil)
type metricsRouter struct {
consumer.Metrics
consumers map[component.ID]consumer.Metrics
}
func NewMetricsRouter(cm map[component.ID]consumer.Metrics) consumer.Metrics {
consumers := make([]consumer.Metrics, 0, len(cm))
for _, consumer := range cm {
consumers = append(consumers, consumer)
}
return &metricsRouter{
Metrics: NewMetrics(consumers),
consumers: cm,
}
}
func (r *metricsRouter) PipelineIDs() []component.ID {
ids := make([]component.ID, 0, len(r.consumers))
for id := range r.consumers {
ids = append(ids, id)
}
return ids
}
func (r *metricsRouter) Consumer(pipelineIDs ...component.ID) (consumer.Metrics, error) {
if len(pipelineIDs) == 0 {
return nil, fmt.Errorf("missing consumers")
}
consumers := make([]consumer.Metrics, 0, len(pipelineIDs))
var errors error
for _, pipelineID := range pipelineIDs {
c, ok := r.consumers[pipelineID]
if ok {
consumers = append(consumers, c)
} else {
errors = multierr.Append(errors, fmt.Errorf("missing consumer: %q", pipelineID))
}
}
if errors != nil {
// TODO potentially this could return a NewMetrics with the valid consumers
return nil, errors
}
return NewMetrics(consumers), nil
}