Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[exporter/skywalking] Add skywalking metrics exporter #6528

Merged
merged 17 commits into from
Jan 3, 2022
Merged
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@

- `spanmetricproccessor`: use an LRU cache for the cached Dimensions key-value pairs (#2179)

- `skywalkingexporter`: add skywalking metrics exporter (#6528)

## v0.41.0

## 🛑 Breaking changes 🛑
Expand Down
2 changes: 1 addition & 1 deletion cmd/configschema/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,7 @@ require (
sigs.k8s.io/json v0.0.0-20211020170558-c049b76a60c6 // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.1.2 // indirect
sigs.k8s.io/yaml v1.2.0 // indirect
skywalking.apache.org/repo/goapi v0.0.0-20210820070710-e10b78bbf481 // indirect
skywalking.apache.org/repo/goapi v0.0.0-20211122071111-ffc517fbfe21 // indirect
)

// Replace references to modules that are in this repository with their relateive paths
Expand Down
4 changes: 2 additions & 2 deletions cmd/configschema/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion exporter/skywalkingexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

Exports data via gRPC using [skywalking-data-collect-protocol](https://github.com/apache/skywalking-data-collect-protocol) format. By default, this exporter requires TLS and offers queued retry capabilities.

Supported pipeline types: logs
Supported pipeline types: logs, metrics

## Getting Started

Expand Down
20 changes: 18 additions & 2 deletions exporter/skywalkingexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ func NewFactory() component.ExporterFactory {
return exporterhelper.NewFactory(
typeStr,
createDefaultConfig,
exporterhelper.WithLogs(createLogsExporter))
exporterhelper.WithLogs(createLogsExporter),
exporterhelper.WithMetrics(createMetricsExporter))
}

func createDefaultConfig() config.Exporter {
Expand All @@ -58,7 +59,7 @@ func createLogsExporter(
cfg config.Exporter,
) (component.LogsExporter, error) {
oCfg := cfg.(*Config)
oce := newExporter(ctx, oCfg, set.TelemetrySettings)
oce := newLogsExporter(ctx, oCfg, set.TelemetrySettings)
return exporterhelper.NewLogsExporter(
cfg,
set,
Expand All @@ -71,3 +72,18 @@ func createLogsExporter(
exporterhelper.WithShutdown(oce.shutdown),
)
}

func createMetricsExporter(ctx context.Context, set component.ExporterCreateSettings, cfg config.Exporter) (component.MetricsExporter, error) {
oCfg := cfg.(*Config)
oce := newMetricsExporter(ctx, oCfg, set.TelemetrySettings)
return exporterhelper.NewMetricsExporter(
cfg,
set,
oce.pushMetrics,
exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}),
exporterhelper.WithRetry(oCfg.RetrySettings),
exporterhelper.WithQueue(oCfg.QueueSettings),
exporterhelper.WithTimeout(oCfg.TimeoutSettings),
exporterhelper.WithStart(oce.start),
exporterhelper.WithShutdown(oce.shutdown))
}
2 changes: 2 additions & 0 deletions exporter/skywalkingexporter/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,8 @@ func TestCreateTracesExporter(t *testing.T) {
set := componenttest.NewNopExporterCreateSettings()
tExporter, tErr := createLogsExporter(context.Background(), set, &tt.config)
checkErrorsAndStartAndShutdown(t, tExporter, tErr, tt.mustFailOnCreate, tt.mustFailOnStart)
tExporter2, tErr2 := createMetricsExporter(context.Background(), set, &tt.config)
checkErrorsAndStartAndShutdown(t, tExporter2, tErr2, tt.mustFailOnCreate, tt.mustFailOnStart)
})
}
}
Expand Down
14 changes: 7 additions & 7 deletions exporter/skywalkingexporter/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,16 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/exporter/skywal
go 1.17

require (
github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.41.0
liqiangz marked this conversation as resolved.
Show resolved Hide resolved
github.com/stretchr/testify v1.7.0
go.opentelemetry.io/collector v0.41.1-0.20211222180302-3db1d1146483
go.opentelemetry.io/collector/model v0.41.1-0.20211222180302-3db1d1146483
google.golang.org/grpc v1.43.0
skywalking.apache.org/repo/goapi v0.0.0-20211122071111-ffc517fbfe21
)

require (
cloud.google.com/go v0.99.0 // indirect
github.com/cenkalti/backoff/v4 v4.1.2 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/go-logr/logr v1.2.1 // indirect
Expand All @@ -24,6 +28,7 @@ require (
github.com/mitchellh/reflectwalk v1.0.2 // indirect
github.com/mostynb/go-grpc-compression v1.1.15 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/rogpeppe/go-internal v1.8.0 // indirect
github.com/spf13/cast v1.4.1 // indirect
go.opencensus.io v0.23.0 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.28.0 // indirect
Expand All @@ -34,19 +39,14 @@ require (
go.uber.org/multierr v1.7.0 // indirect
go.uber.org/zap v1.19.1 // indirect
golang.org/x/net v0.0.0-20210813160813-60bc85c4be6d // indirect
golang.org/x/oauth2 v0.0.0-20211104180415-d3ed0bb246c8 // indirect
golang.org/x/sys v0.0.0-20211205182925-97ca703d548d // indirect
golang.org/x/text v0.3.7 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20211208223120-3a66f561d7aa // indirect
google.golang.org/protobuf v1.27.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
)

require (
github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.41.0
github.com/rogpeppe/go-internal v1.8.0 // indirect
google.golang.org/grpc v1.43.0
skywalking.apache.org/repo/goapi v0.0.0-20210820070710-e10b78bbf481
)

replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal => ../../internal/coreinternal
5 changes: 3 additions & 2 deletions exporter/skywalkingexporter/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 0 additions & 3 deletions exporter/skywalkingexporter/logrecord_to_logdata.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,6 @@ func resourceToLogData(resource pdata.Resource, logData *logpb.LogData) {
}

attrs.Range(func(k string, v pdata.AttributeValue) bool {
if k == conventions.AttributeServiceName || k == conventions.AttributeServiceInstanceID {
return true
}
liqiangz marked this conversation as resolved.
Show resolved Hide resolved
logData.Tags.Data = append(logData.Tags.Data, &common.KeyStringValuePair{
Key: k,
Value: v.AsString(),
Expand Down
241 changes: 241 additions & 0 deletions exporter/skywalkingexporter/metricrecord_to_metricdata.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,241 @@
// Copyright 2020, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package skywalkingexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/skywalkingexporter"

import (
"strconv"

"go.opentelemetry.io/collector/model/pdata"
conventions "go.opentelemetry.io/collector/model/semconv/v1.5.0"
metricpb "skywalking.apache.org/repo/goapi/collect/language/agent/v3"
)

const (
defaultServiceInstance = "otel-collector-instance"
)

func resourceToMetricLabels(resource pdata.Resource) []*metricpb.Label {
attrs := resource.Attributes()
labels := make([]*metricpb.Label, 0, attrs.Len())
attrs.Range(func(k string, v pdata.AttributeValue) bool {
labels = append(labels,
&metricpb.Label{
Name: k,
Value: v.AsString(),
})
return true
})
return labels
}

func resourceToServiceInfo(resource pdata.Resource) (service string, serviceInstance string) {
attrs := resource.Attributes()
if serviceName, ok := attrs.Get(conventions.AttributeServiceName); ok {
service = serviceName.AsString()
} else {
service = defaultServiceName
}
if serviceInstanceID, ok := attrs.Get(conventions.AttributeServiceInstanceID); ok {
serviceInstance = serviceInstanceID.AsString()
} else {
serviceInstance = defaultServiceInstance
}
return service, serviceInstance
}

func numberMetricsToData(name string, data pdata.NumberDataPointSlice, defaultLabels []*metricpb.Label) (metrics []*metricpb.MeterData) {
metrics = make([]*metricpb.MeterData, 0, data.Len())
for i := 0; i < data.Len(); i++ {
dataPoint := data.At(i)
attributeMap := dataPoint.Attributes()
labels := make([]*metricpb.Label, 0, attributeMap.Len()+len(defaultLabels))
attributeMap.Range(func(k string, v pdata.AttributeValue) bool {
labels = append(labels, &metricpb.Label{Name: k, Value: v.AsString()})
return true
})

for _, label := range defaultLabels {
labels = append(labels, &metricpb.Label{Name: label.Name, Value: label.Value})
}
meterData := &metricpb.MeterData{}
sv := &metricpb.MeterData_SingleValue{SingleValue: &metricpb.MeterSingleValue{}}
sv.SingleValue.Labels = labels
meterData.Timestamp = dataPoint.Timestamp().AsTime().UnixMilli()
sv.SingleValue.Name = name
switch dataPoint.Type() {
case pdata.MetricValueTypeInt:
sv.SingleValue.Value = float64(dataPoint.IntVal())
case pdata.MetricValueTypeDouble:
sv.SingleValue.Value = dataPoint.DoubleVal()
}
meterData.Metric = sv
metrics = append(metrics, meterData)
}
return metrics
}

func doubleHistogramMetricsToData(name string, data pdata.HistogramDataPointSlice, defaultLabels []*metricpb.Label) (metrics []*metricpb.MeterData) {
metrics = make([]*metricpb.MeterData, 0, 3*data.Len())
for i := 0; i < data.Len(); i++ {
dataPoint := data.At(i)
attributeMap := dataPoint.Attributes()
labels := make([]*metricpb.Label, 0, attributeMap.Len()+len(defaultLabels))
attributeMap.Range(func(k string, v pdata.AttributeValue) bool {
labels = append(labels, &metricpb.Label{Name: k, Value: v.AsString()})
return true
})

for _, label := range defaultLabels {
labels = append(labels, &metricpb.Label{Name: label.Name, Value: label.Value})
}

meterData := &metricpb.MeterData{}
hg := &metricpb.MeterData_Histogram{Histogram: &metricpb.MeterHistogram{}}
hg.Histogram.Labels = labels
hg.Histogram.Name = name
bounds := dataPoint.ExplicitBounds()
bucketCount := len(dataPoint.BucketCounts())

if bucketCount > 0 {
hg.Histogram.Values = append(hg.Histogram.Values, &metricpb.MeterBucketValue{Count: int64(dataPoint.BucketCounts()[0]), IsNegativeInfinity: true})
}
for i := 1; i < bucketCount && i-1 < len(bounds); i++ {
hg.Histogram.Values = append(hg.Histogram.Values, &metricpb.MeterBucketValue{Bucket: bounds[i-1], Count: int64(dataPoint.BucketCounts()[i])})
}

meterData.Metric = hg
meterData.Timestamp = dataPoint.Timestamp().AsTime().UnixMilli()
metrics = append(metrics, meterData)

meterDataSum := &metricpb.MeterData{}
svs := &metricpb.MeterData_SingleValue{SingleValue: &metricpb.MeterSingleValue{}}
svs.SingleValue.Labels = labels
svs.SingleValue.Name = name + "_sum"
svs.SingleValue.Value = dataPoint.Sum()
meterDataSum.Metric = svs
meterDataSum.Timestamp = dataPoint.Timestamp().AsTime().UnixMilli()
metrics = append(metrics, meterDataSum)

meterDataCount := &metricpb.MeterData{}
svc := &metricpb.MeterData_SingleValue{SingleValue: &metricpb.MeterSingleValue{}}
svc.SingleValue.Labels = labels
meterDataCount.Timestamp = dataPoint.Timestamp().AsTime().UnixMilli()
svc.SingleValue.Name = name + "_count"
svc.SingleValue.Value = float64(dataPoint.Count())
meterDataCount.Metric = svc
metrics = append(metrics, meterDataCount)
}
return metrics
}

func doubleSummaryMetricsToData(name string, data pdata.SummaryDataPointSlice, defaultLabels []*metricpb.Label) (metrics []*metricpb.MeterData) {
metrics = make([]*metricpb.MeterData, 0, 3*data.Len())
for i := 0; i < data.Len(); i++ {
dataPoint := data.At(i)
attributeMap := dataPoint.Attributes()
labels := make([]*metricpb.Label, 0, attributeMap.Len()+len(defaultLabels))
attributeMap.Range(func(k string, v pdata.AttributeValue) bool {
labels = append(labels, &metricpb.Label{Name: k, Value: v.AsString()})
return true
})

for _, label := range defaultLabels {
labels = append(labels, &metricpb.Label{Name: label.Name, Value: label.Value})
}

values := dataPoint.QuantileValues()
for i := 0; i < values.Len(); i++ {
value := values.At(i)
meterData := &metricpb.MeterData{}
sv := &metricpb.MeterData_SingleValue{SingleValue: &metricpb.MeterSingleValue{}}
svLabels := make([]*metricpb.Label, 0, len(labels)+1)
svLabels = append(svLabels, labels...)
svLabels = append(svLabels, &metricpb.Label{Name: "quantile", Value: strconv.FormatFloat(value.Quantile(), 'g', -1, 64)})
sv.SingleValue.Labels = svLabels
meterData.Timestamp = dataPoint.Timestamp().AsTime().UnixMilli()
sv.SingleValue.Name = name
sv.SingleValue.Value = value.Value()
meterData.Metric = sv
metrics = append(metrics, meterData)
}

meterDataSum := &metricpb.MeterData{}
svs := &metricpb.MeterData_SingleValue{SingleValue: &metricpb.MeterSingleValue{}}
svs.SingleValue.Labels = labels
svs.SingleValue.Name = name + "_sum"
svs.SingleValue.Value = dataPoint.Sum()
meterDataSum.Metric = svs
meterDataSum.Timestamp = dataPoint.Timestamp().AsTime().UnixMilli()
metrics = append(metrics, meterDataSum)

meterDataCount := &metricpb.MeterData{}
svc := &metricpb.MeterData_SingleValue{SingleValue: &metricpb.MeterSingleValue{}}
svc.SingleValue.Labels = labels
meterDataCount.Timestamp = dataPoint.Timestamp().AsTime().UnixMilli()
svc.SingleValue.Name = name + "_count"
svc.SingleValue.Value = float64(dataPoint.Count())
meterDataCount.Metric = svc
metrics = append(metrics, meterDataCount)
}
return metrics
}

func metricDataToSwMetricData(md pdata.Metric, defaultLabels []*metricpb.Label) (metrics []*metricpb.MeterData) {
switch md.DataType() {
case pdata.MetricDataTypeNone:
break
case pdata.MetricDataTypeGauge:
return numberMetricsToData(md.Name(), md.Gauge().DataPoints(), defaultLabels)
case pdata.MetricDataTypeSum:
return numberMetricsToData(md.Name(), md.Sum().DataPoints(), defaultLabels)
case pdata.MetricDataTypeHistogram:
return doubleHistogramMetricsToData(md.Name(), md.Histogram().DataPoints(), defaultLabels)
case pdata.MetricDataTypeSummary:
return doubleSummaryMetricsToData(md.Name(), md.Summary().DataPoints(), defaultLabels)
}
return nil
}

func metricsRecordToMetricData(
md pdata.Metrics,
) (metrics *metricpb.MeterDataCollection) {
resMetrics := md.ResourceMetrics()
for i := 0; i < resMetrics.Len(); i++ {
resMetricSlice := resMetrics.At(i)
labels := resourceToMetricLabels(resMetricSlice.Resource())
service, serviceInstance := resourceToServiceInfo(resMetricSlice.Resource())
insMetricSlice := resMetricSlice.InstrumentationLibraryMetrics()
metrics = &metricpb.MeterDataCollection{}
metrics.MeterData = make([]*metricpb.MeterData, 0)
for j := 0; j < insMetricSlice.Len(); j++ {
insMetrics := insMetricSlice.At(j)
// ignore insMetrics.InstrumentationLibrary()
metricSlice := insMetrics.Metrics()
for k := 0; k < metricSlice.Len(); k++ {
oneMetric := metricSlice.At(k)
ms := metricDataToSwMetricData(oneMetric, labels)
if ms == nil {
continue
}
for _, m := range ms {
m.Service = service
m.ServiceInstance = serviceInstance
}
metrics.MeterData = append(metrics.MeterData, ms...)
}
}
}
return metrics
}
Loading