From 061b8eab3862fae67143d4f7a58ef4c73b164345 Mon Sep 17 00:00:00 2001 From: Dominik Rosiek <58699848+sumo-drosiek@users.noreply.github.com> Date: Wed, 8 May 2024 11:41:28 +0200 Subject: [PATCH] sumologicexporter!: change metrics behavior (#32737) **Description:** * remove suppport for carbon2 and graphite * add support for otlp format * do not support metadata attributes * do not support source headers * set otlp as default format This PR reduces size of #32315 **Link to tracking Issue:** #31479 **Testing:** - unit tests - manual tests **Documentation:** - Readme --------- Signed-off-by: Dominik Rosiek Co-authored-by: Adam Boguszewski --- .chloggen/drosiek-exporter-metrics.yaml | 32 ++ exporter/sumologicexporter/README.md | 22 +- .../sumologicexporter/carbon_formatter.go | 103 ----- .../carbon_formatter_test.go | 88 ----- exporter/sumologicexporter/config.go | 28 +- exporter/sumologicexporter/config_test.go | 10 +- exporter/sumologicexporter/exporter.go | 114 ++---- exporter/sumologicexporter/exporter_test.go | 372 ++++++++++-------- exporter/sumologicexporter/factory.go | 1 - exporter/sumologicexporter/factory_test.go | 3 +- exporter/sumologicexporter/fields.go | 4 + .../sumologicexporter/graphite_formatter.go | 115 ------ .../graphite_formatter_test.go | 137 ------- exporter/sumologicexporter/otlp_test.go | 4 +- .../prometheus_formatter_test.go | 50 +-- exporter/sumologicexporter/sender.go | 194 ++++++--- exporter/sumologicexporter/sender_test.go | 343 ++++++++-------- exporter/sumologicexporter/test_data.go | 266 ------------- exporter/sumologicexporter/test_data_test.go | 292 ++++++++++++++ 19 files changed, 936 insertions(+), 1242 deletions(-) create mode 100644 .chloggen/drosiek-exporter-metrics.yaml delete mode 100644 exporter/sumologicexporter/carbon_formatter.go delete mode 100644 exporter/sumologicexporter/carbon_formatter_test.go delete mode 100644 exporter/sumologicexporter/graphite_formatter.go delete mode 100644 exporter/sumologicexporter/graphite_formatter_test.go delete mode 100644 exporter/sumologicexporter/test_data.go create mode 100644 exporter/sumologicexporter/test_data_test.go diff --git a/.chloggen/drosiek-exporter-metrics.yaml b/.chloggen/drosiek-exporter-metrics.yaml new file mode 100644 index 000000000000..db2c39150b49 --- /dev/null +++ b/.chloggen/drosiek-exporter-metrics.yaml @@ -0,0 +1,32 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: breaking + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: sumologicexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: change metrics behavior + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [31479] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: | + * remove suppport for carbon2 and graphite + * add support for otlp format + * do not support metadata attributes + * do not support source headers + * set otlp as default metrics format + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/exporter/sumologicexporter/README.md b/exporter/sumologicexporter/README.md index d76591077bdb..c678df2c3213 100644 --- a/exporter/sumologicexporter/README.md +++ b/exporter/sumologicexporter/README.md @@ -18,7 +18,7 @@ For some time we have been developing the [new Sumo Logic exporter](https://github.com/SumoLogic/sumologic-otel-collector/tree/main/pkg/exporter/sumologicexporter#sumo-logic-exporter) and now we are in the process of moving it into this repository. -The following options are deprecated and they will not exist in the new version: +The following options are deprecated for logs and already do not work for metrics: - `metric_format: {carbon2, graphite}` - `metadata_attributes: []` @@ -29,8 +29,8 @@ The following options are deprecated and they will not exist in the new version: After the new exporter will be moved to this repository: -- `carbon2` and `graphite` are going to be no longer supported and `prometheus` or `otlp` format should be used -- all resource level attributes are going to be treated as `metadata_attributes`. You can use [Group by Attributes processor](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/processor/groupbyattrsprocessor) to move attributes from record level to resource level. For example: +- `carbon2` and `graphite` are no longer supported and `prometheus` or `otlp` format should be used +- all resource level attributes are going to be treated (are treated for metrics) as `metadata_attributes`. You can use [Group by Attributes processor](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/processor/groupbyattrsprocessor) to move attributes from record level to resource level. For example: ```yaml # before switch to new collector @@ -45,7 +45,7 @@ After the new exporter will be moved to this repository: - my_attribute ``` -- Source templates (`source_category`, `source_name` and `source_host`) are going to be removed from the exporter and sources may be set using `_sourceCategory`, `sourceName` or `_sourceHost` resource attributes. We recommend to use [Transform Processor](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/processor/transformprocessor/). For example: +- Source templates (`source_category`, `source_name` and `source_host`) are going to be removed from the exporter and sources may be set using `_sourceCategory`, `sourceName` or `_sourceHost` resource attributes. This feature has been already disabled for metrics. We recommend to use [Transform Processor](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/processor/transformprocessor/). For example: ```yaml # before switch to new collector @@ -95,11 +95,15 @@ exporters: # format to use when sending logs to Sumo Logic, default = json, log_format: {json, text} - # format to use when sending metrics to Sumo Logic, default = prometheus, - # - # carbon2 and graphite are deprecated: - # https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/exporter/sumologicexporter#migration-to-new-architecture - metric_format: {carbon2, graphite, prometheus} + # format to use when sending metrics to Sumo Logic, default = otlp, + # NOTE: only `otlp` is supported when used with sumologicextension + metric_format: {otlp, prometheus} + + # Decompose OTLP Histograms into individual metrics, similar to how they're represented in Prometheus format. + # The Sumo OTLP source currently doesn't support Histograms, and they are quietly dropped. This option produces + # metrics similar to when metric_format is set to prometheus. + # default = false + decompose_otlp_histograms: {true, false} # Template for Graphite format. # this option affects graphite format only diff --git a/exporter/sumologicexporter/carbon_formatter.go b/exporter/sumologicexporter/carbon_formatter.go deleted file mode 100644 index aed80272325c..000000000000 --- a/exporter/sumologicexporter/carbon_formatter.go +++ /dev/null @@ -1,103 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package sumologicexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/sumologicexporter" - -import ( - "fmt" - "strings" - - "go.opentelemetry.io/collector/pdata/pcommon" - "go.opentelemetry.io/collector/pdata/pmetric" -) - -// carbon2TagString returns all attributes as space spearated key=value pairs. -// In addition, metric name and unit are also included. -// In case `metric` or `unit` attributes has been set too, they are prefixed -// with underscore `_` to avoid overwriting the metric name and unit. -func carbon2TagString(record metricPair) string { - length := record.attributes.Len() - - if _, ok := record.attributes.Get("metric"); ok { - length++ - } - - if _, ok := record.attributes.Get("unit"); ok && len(record.metric.Unit()) > 0 { - length++ - } - - returnValue := make([]string, 0, length) - record.attributes.Range(func(k string, v pcommon.Value) bool { - if k == "name" || k == "unit" { - k = fmt.Sprintf("_%s", k) - } - returnValue = append(returnValue, fmt.Sprintf( - "%s=%s", - sanitizeCarbonString(k), - sanitizeCarbonString(v.AsString()), - )) - return true - }) - - returnValue = append(returnValue, fmt.Sprintf("metric=%s", sanitizeCarbonString(record.metric.Name()))) - - if len(record.metric.Unit()) > 0 { - returnValue = append(returnValue, fmt.Sprintf("unit=%s", sanitizeCarbonString(record.metric.Unit()))) - } - - return strings.Join(returnValue, " ") -} - -// sanitizeCarbonString replaces problematic characters with underscore -func sanitizeCarbonString(text string) string { - return strings.NewReplacer(" ", "_", "=", ":", "\n", "_").Replace(text) -} - -// carbon2NumberRecord converts NumberDataPoint to carbon2 metric string -// with additional information from metricPair. -func carbon2NumberRecord(record metricPair, dataPoint pmetric.NumberDataPoint) string { - switch dataPoint.ValueType() { - case pmetric.NumberDataPointValueTypeDouble: - return fmt.Sprintf("%s %g %d", - carbon2TagString(record), - dataPoint.DoubleValue(), - dataPoint.Timestamp()/1e9, - ) - case pmetric.NumberDataPointValueTypeInt: - return fmt.Sprintf("%s %d %d", - carbon2TagString(record), - dataPoint.IntValue(), - dataPoint.Timestamp()/1e9, - ) - case pmetric.NumberDataPointValueTypeEmpty: - return "" - } - return "" -} - -// carbon2metric2String converts metric to Carbon2 formatted string. -func carbon2Metric2String(record metricPair) string { - var nextLines []string - //exhaustive:enforce - switch record.metric.Type() { - case pmetric.MetricTypeGauge: - dps := record.metric.Gauge().DataPoints() - nextLines = make([]string, 0, dps.Len()) - for i := 0; i < dps.Len(); i++ { - nextLines = append(nextLines, carbon2NumberRecord(record, dps.At(i))) - } - case pmetric.MetricTypeSum: - dps := record.metric.Sum().DataPoints() - nextLines = make([]string, 0, dps.Len()) - for i := 0; i < dps.Len(); i++ { - nextLines = append(nextLines, carbon2NumberRecord(record, dps.At(i))) - } - // Skip complex metrics - case pmetric.MetricTypeHistogram: - case pmetric.MetricTypeSummary: - case pmetric.MetricTypeEmpty: - case pmetric.MetricTypeExponentialHistogram: - } - - return strings.Join(nextLines, "\n") -} diff --git a/exporter/sumologicexporter/carbon_formatter_test.go b/exporter/sumologicexporter/carbon_formatter_test.go deleted file mode 100644 index 5f361721d0c9..000000000000 --- a/exporter/sumologicexporter/carbon_formatter_test.go +++ /dev/null @@ -1,88 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package sumologicexporter - -import ( - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestCarbon2TagString(t *testing.T) { - metric := exampleIntMetric() - data := carbon2TagString(metric) - assert.Equal(t, "test=test_value test2=second_value metric=test.metric.data unit=bytes", data) - - metric = exampleIntGaugeMetric() - data = carbon2TagString(metric) - assert.Equal(t, "foo=bar metric=gauge_metric_name", data) - - metric = exampleDoubleSumMetric() - data = carbon2TagString(metric) - assert.Equal(t, "foo=bar metric=sum_metric_double_test", data) - - metric = exampleDoubleGaugeMetric() - data = carbon2TagString(metric) - assert.Equal(t, "foo=bar metric=gauge_metric_name_double_test", data) -} - -func TestCarbonMetricTypeIntGauge(t *testing.T) { - metric := exampleIntGaugeMetric() - - result := carbon2Metric2String(metric) - expected := `foo=bar metric=gauge_metric_name 124 1608124661 -foo=bar metric=gauge_metric_name 245 1608124662` - assert.Equal(t, expected, result) -} - -func TestCarbonMetricTypeDoubleGauge(t *testing.T) { - metric := exampleDoubleGaugeMetric() - - result := carbon2Metric2String(metric) - expected := `foo=bar metric=gauge_metric_name_double_test 33.4 1608124661 -foo=bar metric=gauge_metric_name_double_test 56.8 1608124662` - assert.Equal(t, expected, result) -} - -func TestCarbonMetricTypeIntSum(t *testing.T) { - metric := exampleIntSumMetric() - - result := carbon2Metric2String(metric) - expected := `foo=bar metric=sum_metric_int_test 45 1608124444 -foo=bar metric=sum_metric_int_test 1238 1608124699` - assert.Equal(t, expected, result) -} - -func TestCarbonMetricTypeDoubleSum(t *testing.T) { - metric := exampleDoubleSumMetric() - - result := carbon2Metric2String(metric) - expected := `foo=bar metric=sum_metric_double_test 45.6 1618124444 -foo=bar metric=sum_metric_double_test 1238.1 1608424699` - assert.Equal(t, expected, result) -} - -func TestCarbonMetricTypeSummary(t *testing.T) { - metric := exampleSummaryMetric() - - result := carbon2Metric2String(metric) - expected := `` - assert.Equal(t, expected, result) - - metric = buildExampleSummaryMetric(false) - result = carbon2Metric2String(metric) - assert.Equal(t, expected, result) -} - -func TestCarbonMetricTypeHistogram(t *testing.T) { - metric := exampleHistogramMetric() - - result := carbon2Metric2String(metric) - expected := `` - assert.Equal(t, expected, result) - - metric = buildExampleHistogramMetric(false) - result = carbon2Metric2String(metric) - assert.Equal(t, expected, result) -} diff --git a/exporter/sumologicexporter/config.go b/exporter/sumologicexporter/config.go index dc621a688eeb..55d5190b5139 100644 --- a/exporter/sumologicexporter/config.go +++ b/exporter/sumologicexporter/config.go @@ -38,12 +38,11 @@ type Config struct { LogFormat LogFormatType `mapstructure:"log_format"` // Metrics related configuration - // The format of metrics you will be sending, either graphite or carbon2 or prometheus (Default is prometheus) - // Possible values are `carbon2` and `prometheus` + // The format of metrics you will be sending, either otlp or prometheus (Default is otlp) MetricFormat MetricFormatType `mapstructure:"metric_format"` - // Graphite template. - // Placeholders `%{attr_name}` will be replaced with attribute value for attr_name. - GraphiteTemplate string `mapstructure:"graphite_template"` + + // Decompose OTLP Histograms into individual metrics, similar to how they're represented in Prometheus format + DecomposeOtlpHistograms bool `mapstructure:"decompose_otlp_histograms"` // List of regexes for attributes which should be send as metadata MetadataAttributes []string `mapstructure:"metadata_attributes"` @@ -92,12 +91,14 @@ const ( TextFormat LogFormatType = "text" // JSONFormat represents log_format: json JSONFormat LogFormatType = "json" + // RemovedGraphiteFormat represents the no longer supported graphite metric format + RemovedGraphiteFormat MetricFormatType = "graphite" + // RemovedCarbon2Format represents the no longer supported carbon2 metric format + RemovedCarbon2Format MetricFormatType = "carbon2" // GraphiteFormat represents metric_format: text - GraphiteFormat MetricFormatType = "graphite" - // Carbon2Format represents metric_format: json - Carbon2Format MetricFormatType = "carbon2" - // PrometheusFormat represents metric_format: json PrometheusFormat MetricFormatType = "prometheus" + // OTLPMetricFormat represents metric_format: otlp + OTLPMetricFormat MetricFormatType = "otlp" // GZIPCompression represents compress_encoding: gzip GZIPCompression CompressEncodingType = "gzip" // DeflateCompression represents compress_encoding: deflate @@ -119,7 +120,7 @@ const ( // DefaultLogFormat defines default LogFormat DefaultLogFormat LogFormatType = JSONFormat // DefaultMetricFormat defines default MetricFormat - DefaultMetricFormat MetricFormatType = PrometheusFormat + DefaultMetricFormat MetricFormatType = OTLPMetricFormat // DefaultSourceCategory defines default SourceCategory DefaultSourceCategory string = "" // DefaultSourceName defines default SourceName @@ -141,9 +142,12 @@ func (cfg *Config) Validate() error { } switch cfg.MetricFormat { - case GraphiteFormat: - case Carbon2Format: + case RemovedGraphiteFormat: + fallthrough + case RemovedCarbon2Format: + return fmt.Errorf("%s metric format is no longer supported", cfg.MetricFormat) case PrometheusFormat: + case OTLPMetricFormat: default: return fmt.Errorf("unexpected metric format: %s", cfg.MetricFormat) } diff --git a/exporter/sumologicexporter/config_test.go b/exporter/sumologicexporter/config_test.go index 8aa2fc91ebdc..d0353b4c0c2f 100644 --- a/exporter/sumologicexporter/config_test.go +++ b/exporter/sumologicexporter/config_test.go @@ -22,7 +22,7 @@ func TestConfigValidation(t *testing.T) { name: "invalid log format", cfg: &Config{ LogFormat: "test_format", - MetricFormat: "carbon2", + MetricFormat: "otlp", CompressEncoding: "gzip", ClientConfig: confighttp.ClientConfig{ Timeout: defaultTimeout, @@ -48,7 +48,7 @@ func TestConfigValidation(t *testing.T) { name: "invalid compress encoding", cfg: &Config{ LogFormat: "json", - MetricFormat: "carbon2", + MetricFormat: "otlp", CompressEncoding: "test_format", ClientConfig: confighttp.ClientConfig{ Timeout: defaultTimeout, @@ -62,7 +62,7 @@ func TestConfigValidation(t *testing.T) { expectedErr: "no endpoint and no auth extension specified", cfg: &Config{ LogFormat: "json", - MetricFormat: "carbon2", + MetricFormat: "otlp", CompressEncoding: "gzip", ClientConfig: confighttp.ClientConfig{ Timeout: defaultTimeout, @@ -73,7 +73,7 @@ func TestConfigValidation(t *testing.T) { name: "invalid log format", cfg: &Config{ LogFormat: "json", - MetricFormat: "carbon2", + MetricFormat: "otlp", CompressEncoding: "gzip", ClientConfig: confighttp.ClientConfig{ Timeout: defaultTimeout, @@ -90,7 +90,7 @@ func TestConfigValidation(t *testing.T) { name: "valid config", cfg: &Config{ LogFormat: "json", - MetricFormat: "carbon2", + MetricFormat: "otlp", CompressEncoding: "gzip", ClientConfig: confighttp.ClientConfig{ Timeout: defaultTimeout, diff --git a/exporter/sumologicexporter/exporter.go b/exporter/sumologicexporter/exporter.go index 4a0413c1f4e2..f7aa0bdf63fb 100644 --- a/exporter/sumologicexporter/exporter.go +++ b/exporter/sumologicexporter/exporter.go @@ -40,7 +40,6 @@ type sumologicexporter struct { sources sourceFormats filter filter prometheusFormatter prometheusFormatter - graphiteFormatter graphiteFormatter settings component.TelemetrySettings clientLock sync.RWMutex @@ -61,12 +60,12 @@ type sumologicexporter struct { func initExporter(cfg *Config, settings component.TelemetrySettings) (*sumologicexporter, error) { - if cfg.MetricFormat == GraphiteFormat { - settings.Logger.Warn("`metric_format: graphite` nad `graphite_template` are deprecated and are going to be removed in the future. See https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/exporter/sumologicexporter#migration-to-new-architecture for more information") + if cfg.MetricFormat == RemovedGraphiteFormat { + settings.Logger.Error("`metric_format: graphite` nad `graphite_template` are no longer supported. See https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/exporter/sumologicexporter#migration-to-new-architecture for more information") } - if cfg.MetricFormat == Carbon2Format { - settings.Logger.Warn("`metric_format: carbon` is deprecated and is going to be removed in the future. See https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/exporter/sumologicexporter#migration-to-new-architecture for more information") + if cfg.MetricFormat == RemovedCarbon2Format { + settings.Logger.Error("`metric_format: carbon` is no longer supported. See https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/exporter/sumologicexporter#migration-to-new-architecture for more information") } if len(cfg.MetadataAttributes) > 0 { @@ -94,15 +93,12 @@ func initExporter(cfg *Config, settings component.TelemetrySettings) (*sumologic pf := newPrometheusFormatter() - gf := newGraphiteFormatter(cfg.GraphiteTemplate) - se := &sumologicexporter{ logger: settings.Logger, config: cfg, sources: sfs, filter: f, prometheusFormatter: pf, - graphiteFormatter: gf, settings: settings, } @@ -307,7 +303,6 @@ func (se *sumologicexporter) pushLogsData(ctx context.Context, ld plog.Logs) err metricsURL, logsURL, tracesURL, - se.graphiteFormatter, se.id, ) @@ -382,14 +377,6 @@ func (se *sumologicexporter) pushLogsData(ctx context.Context, ld plog.Logs) err // it returns number of unsent metrics and error which contains list of dropped records // so they can be handle by the OTC retry mechanism func (se *sumologicexporter) pushMetricsData(ctx context.Context, md pmetric.Metrics) error { - var ( - currentMetadata fields - previousMetadata = newFields(pcommon.NewMap()) - errs []error - droppedRecords []metricPair - attributes pcommon.Map - ) - c, err := newCompressor(se.config.CompressEncoding) if err != nil { return consumererror.NewMetrics(fmt.Errorf("failed to initialize compressor: %w", err), md) @@ -406,83 +393,46 @@ func (se *sumologicexporter) pushMetricsData(ctx context.Context, md pmetric.Met metricsURL, logsURL, tracesURL, - se.graphiteFormatter, se.id, ) - // Iterate over ResourceMetrics - rms := md.ResourceMetrics() - for i := 0; i < rms.Len(); i++ { - rm := rms.At(i) - - attributes = rm.Resource().Attributes() - - // iterate over ScopeMetrics - ilms := rm.ScopeMetrics() - for j := 0; j < ilms.Len(); j++ { - ilm := ilms.At(j) - - // iterate over Metrics - ms := ilm.Metrics() - for k := 0; k < ms.Len(); k++ { - m := ms.At(k) - mp := metricPair{ - metric: m, - attributes: attributes, - } - - currentMetadata = sdr.filter.mergeAndFilterIn(attributes) - - // If metadata differs from currently buffered, flush the buffer - if currentMetadata.string() != previousMetadata.string() && previousMetadata.string() != "" { - var dropped []metricPair - dropped, err = sdr.sendMetrics(ctx, previousMetadata) - if err != nil { - errs = append(errs, err) - droppedRecords = append(droppedRecords, dropped...) - } - sdr.cleanMetricBuffer() - } - - // assign metadata - previousMetadata = currentMetadata - var dropped []metricPair - // add metric to the buffer - dropped, err = sdr.batchMetric(ctx, mp, currentMetadata) - if err != nil { - droppedRecords = append(droppedRecords, dropped...) - errs = append(errs, err) - } - } + var droppedMetrics pmetric.Metrics + var errs []error + if sdr.config.MetricFormat == OTLPMetricFormat { + if err := sdr.sendOTLPMetrics(ctx, md); err != nil { + droppedMetrics = md + errs = []error{err} } + } else { + droppedMetrics, errs = sdr.sendNonOTLPMetrics(ctx, md) } - // Flush pending metrics - dropped, err := sdr.sendMetrics(ctx, previousMetadata) - if err != nil { - droppedRecords = append(droppedRecords, dropped...) - errs = append(errs, err) - } - - if len(droppedRecords) > 0 { - // Move all dropped records to Metrics - droppedMetrics := pmetric.NewMetrics() - rms := droppedMetrics.ResourceMetrics() - rms.EnsureCapacity(len(droppedRecords)) - for _, record := range droppedRecords { - rm := droppedMetrics.ResourceMetrics().AppendEmpty() - record.attributes.CopyTo(rm.Resource().Attributes()) - - ilms := rm.ScopeMetrics() - record.metric.CopyTo(ilms.AppendEmpty().Metrics().AppendEmpty()) - } - + if len(errs) > 0 { + se.handleUnauthorizedErrors(ctx, errs...) return consumererror.NewMetrics(errors.Join(errs...), droppedMetrics) } return nil } +// handleUnauthorizedErrors checks if any of the provided errors is an unauthorized error. +// In which case it triggers exporter reconfiguration which in turn takes the credentials +// from sumologicextension which at this point should already detect the problem with +// authorization (via heartbeats) and prepare new collector credentials to be available. +func (se *sumologicexporter) handleUnauthorizedErrors(ctx context.Context, errs ...error) { + for _, err := range errs { + if errors.Is(err, errUnauthorized) { + se.logger.Warn("Received unauthorized status code, triggering reconfiguration") + if errC := se.configure(ctx); errC != nil { + se.logger.Error("Error configuring the exporter with new credentials", zap.Error(err)) + } else { + // It's enough to successfully reconfigure the exporter just once. + return + } + } + } +} + // get the destination url for a given signal type // this mostly adds signal-specific suffixes if the format is otlp func getSignalURL(oCfg *Config, endpointURL string, signal component.DataType) (string, error) { diff --git a/exporter/sumologicexporter/exporter_test.go b/exporter/sumologicexporter/exporter_test.go index dbf56f28457f..e7182eb72fbb 100644 --- a/exporter/sumologicexporter/exporter_test.go +++ b/exporter/sumologicexporter/exporter_test.go @@ -8,18 +8,41 @@ import ( "errors" "fmt" "net/http" + "net/http/httptest" "strings" + "sync/atomic" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/config/configcompression" "go.opentelemetry.io/collector/config/confighttp" "go.opentelemetry.io/collector/consumer/consumererror" + "go.opentelemetry.io/collector/exporter" + "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.uber.org/zap" ) +type exporterTest struct { + srv *httptest.Server + exp *sumologicexporter + reqCounter *int32 +} + +func createTestConfig() *Config { + config := createDefaultConfig().(*Config) + config.ClientConfig.Compression = configcompression.Type(NoCompression) + config.LogFormat = TextFormat + config.MaxRequestBodySize = 20_971_520 + config.MetricFormat = OTLPMetricFormat + return config +} + func logRecordsToLogs(records []plog.LogRecord) plog.Logs { logs := plog.NewLogs() logsSlice := logs.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords() @@ -31,6 +54,55 @@ func logRecordsToLogs(records []plog.LogRecord) plog.Logs { return logs } +func createExporterCreateSettings() exporter.CreateSettings { + return exporter.CreateSettings{ + TelemetrySettings: component.TelemetrySettings{ + Logger: zap.NewNop(), + }, + } +} + +// prepareExporterTest prepares an exporter test object using provided config +// and a slice of callbacks to be called for subsequent requests coming being +// sent to the server. +// The enclosed *httptest.Server is automatically closed on test cleanup. +func prepareExporterTest(t *testing.T, cfg *Config, cb []func(w http.ResponseWriter, req *http.Request)) *exporterTest { + var reqCounter int32 + // generate a test server so we can capture and inspect the request + testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + c := int(atomic.LoadInt32(&reqCounter)) + if assert.Greaterf(t, len(cb), c, "Exporter sent more requests (%d) than the number of test callbacks defined: %d", c+1, len(cb)) { + cb[c](w, req) + atomic.AddInt32(&reqCounter, 1) + } + })) + t.Cleanup(func() { + testServer.Close() + + // Ensure we got all required requests + assert.Eventuallyf(t, func() bool { + return int(atomic.LoadInt32(&reqCounter)) == len(cb) + }, 2*time.Second, 100*time.Millisecond, + "HTTP server didn't receive all the expected requests; got: %d, expected: %d", + atomic.LoadInt32(&reqCounter), len(cb), + ) + }) + + cfg.ClientConfig.Endpoint = testServer.URL + cfg.ClientConfig.Auth = nil + + exp, err := initExporter(cfg, createExporterCreateSettings().TelemetrySettings) + require.NoError(t, err) + + require.NoError(t, exp.start(context.Background(), componenttest.NewNopHost())) + + return &exporterTest{ + srv: testServer, + exp: exp, + reqCounter: &reqCounter, + } +} + func TestInitExporter(t *testing.T) { _, err := initExporter(&Config{ LogFormat: "json", @@ -228,197 +300,183 @@ func TestPushFailedBatch(t *testing.T) { } func TestAllMetricsSuccess(t *testing.T) { - test := prepareSenderTest(t, []func(w http.ResponseWriter, req *http.Request){ - func(_ http.ResponseWriter, req *http.Request) { - body := extractBody(t, req) - expected := `test.metric.data{test="test_value",test2="second_value"} 14500 1605534165000 -gauge_metric_name{foo="bar",remote_name="156920",url="http://example_url"} 124 1608124661166 -gauge_metric_name{foo="bar",remote_name="156955",url="http://another_url"} 245 1608124662166` - assert.Equal(t, expected, body) - assert.Equal(t, "application/vnd.sumologic.prometheus", req.Header.Get("Content-Type")) + testcases := []struct { + name string + expectedBody string + metricFunc func() (pmetric.Metric, pcommon.Map) + }{ + { + name: "sum", + expectedBody: `test.metric.data{test="test_value",test2="second_value"} 14500 1605534165000`, + metricFunc: exampleIntMetric, }, - }) - defer func() { test.srv.Close() }() - test.exp.config.MetricFormat = PrometheusFormat - - metrics := metricPairToMetrics([]metricPair{ - exampleIntMetric(), - exampleIntGaugeMetric(), - }) + { + name: "gauge", + expectedBody: `gauge_metric_name{foo="bar",remote_name="156920",url="http://example_url"} 124 1608124661166 +gauge_metric_name{foo="bar",remote_name="156955",url="http://another_url"} 245 1608124662166`, + metricFunc: exampleIntGaugeMetric, + }, + } - err := test.exp.pushMetricsData(context.Background(), metrics) - assert.NoError(t, err) + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + test := prepareExporterTest(t, createTestConfig(), []func(w http.ResponseWriter, req *http.Request){ + func(_ http.ResponseWriter, req *http.Request) { + body := extractBody(t, req) + assert.Equal(t, tc.expectedBody, body) + assert.Equal(t, "application/vnd.sumologic.prometheus", req.Header.Get("Content-Type")) + }, + }) + test.exp.config.MetricFormat = PrometheusFormat + + metric := metricAndAttributesToPdataMetrics(tc.metricFunc()) + metric.MarkReadOnly() + + err := test.exp.pushMetricsData(context.Background(), metric) + assert.NoError(t, err) + }) + } } -func TestAllMetricsFailed(t *testing.T) { - test := prepareSenderTest(t, []func(w http.ResponseWriter, req *http.Request){ - func(w http.ResponseWriter, req *http.Request) { - w.WriteHeader(500) - +func TestAllMetricsOTLP(t *testing.T) { + test := prepareExporterTest(t, createTestConfig(), []func(w http.ResponseWriter, req *http.Request){ + func(_ http.ResponseWriter, req *http.Request) { body := extractBody(t, req) - expected := `test.metric.data{test="test_value",test2="second_value"} 14500 1605534165000 -gauge_metric_name{foo="bar",remote_name="156920",url="http://example_url"} 124 1608124661166 -gauge_metric_name{foo="bar",remote_name="156955",url="http://another_url"} 245 1608124662166` + + md, err := (&pmetric.ProtoUnmarshaler{}).UnmarshalMetrics([]byte(body)) + assert.NoError(t, err) + assert.NotNil(t, md) + + //nolint:lll + expected := "\nf\n/\n\x14\n\x04test\x12\f\n\ntest_value\n\x17\n\x05test2\x12\x0e\n\fsecond_value\x123\n\x00\x12/\n\x10test.metric.data\x1a\x05bytes:\x14\n\x12\x19\x00\x12\x94\v\xd1\x00H\x161\xa48\x00\x00\x00\x00\x00\x00\n\xc2\x01\n\x0e\n\f\n\x03foo\x12\x05\n\x03bar\x12\xaf\x01\n\x00\x12\xaa\x01\n\x11gauge_metric_name*\x94\x01\nH\x19\x80GX\xef\xdb4Q\x161|\x00\x00\x00\x00\x00\x00\x00:\x17\n\vremote_name\x12\b\n\x06156920:\x1b\n\x03url\x12\x14\n\x12http://example_url\nH\x19\x80\x11\xf3*\xdc4Q\x161\xf5\x00\x00\x00\x00\x00\x00\x00:\x17\n\vremote_name\x12\b\n\x06156955:\x1b\n\x03url\x12\x14\n\x12http://another_url" assert.Equal(t, expected, body) - assert.Equal(t, "application/vnd.sumologic.prometheus", req.Header.Get("Content-Type")) + assert.Equal(t, "application/x-protobuf", req.Header.Get("Content-Type")) }, }) - defer func() { test.srv.Close() }() - test.exp.config.MetricFormat = PrometheusFormat - - metrics := metricPairToMetrics([]metricPair{ - exampleIntMetric(), - exampleIntGaugeMetric(), - }) + test.exp.config.MetricFormat = OTLPMetricFormat + + metricSum, attrsSum := exampleIntMetric() + metricGauge, attrsGauge := exampleIntGaugeMetric() + metrics := metricPairToMetrics( + metricPair{ + attributes: attrsSum, + metric: metricSum, + }, + metricPair{ + attributes: attrsGauge, + metric: metricGauge, + }, + ) err := test.exp.pushMetricsData(context.Background(), metrics) - assert.EqualError(t, err, "failed sending data: status: 500 Internal Server Error") - - var partial consumererror.Metrics - require.True(t, errors.As(err, &partial)) - assert.Equal(t, metrics, partial.Data()) + assert.NoError(t, err) } -func TestMetricsPartiallyFailed(t *testing.T) { - test := prepareSenderTest(t, []func(w http.ResponseWriter, req *http.Request){ - func(w http.ResponseWriter, req *http.Request) { - w.WriteHeader(500) - - body := extractBody(t, req) - expected := `test.metric.data{test="test_value",test2="second_value"} 14500 1605534165000` - assert.Equal(t, expected, body) - assert.Equal(t, "application/vnd.sumologic.prometheus", req.Header.Get("Content-Type")) +func TestAllMetricsFailed(t *testing.T) { + testcases := []struct { + name string + callbacks []func(w http.ResponseWriter, req *http.Request) + metricFunc func() pmetric.Metrics + expectedError string + }{ + { + name: "sent together when metrics under the same resource", + callbacks: []func(w http.ResponseWriter, req *http.Request){ + func(w http.ResponseWriter, req *http.Request) { + w.WriteHeader(500) + + body := extractBody(t, req) + expected := `test.metric.data{test="test_value",test2="second_value"} 14500 1605534165000 +gauge_metric_name{test="test_value",test2="second_value",remote_name="156920",url="http://example_url"} 124 1608124661166 +gauge_metric_name{test="test_value",test2="second_value",remote_name="156955",url="http://another_url"} 245 1608124662166` + assert.Equal(t, expected, body) + assert.Equal(t, "application/vnd.sumologic.prometheus", req.Header.Get("Content-Type")) + }, + }, + metricFunc: func() pmetric.Metrics { + metricSum, attrs := exampleIntMetric() + metricGauge, _ := exampleIntGaugeMetric() + metrics := metricAndAttrsToPdataMetrics( + attrs, + metricSum, metricGauge, + ) + metrics.MarkReadOnly() + return metrics + }, + expectedError: "failed sending data: status: 500 Internal Server Error", }, - func(_ http.ResponseWriter, req *http.Request) { - body := extractBody(t, req) - expected := `gauge_metric_name{foo="bar",remote_name="156920",url="http://example_url"} 124 1608124661166 + { + name: "sent together when metrics under different resources", + callbacks: []func(w http.ResponseWriter, req *http.Request){ + func(w http.ResponseWriter, req *http.Request) { + w.WriteHeader(500) + + body := extractBody(t, req) + expected := `test.metric.data{test="test_value",test2="second_value"} 14500 1605534165000 +gauge_metric_name{foo="bar",remote_name="156920",url="http://example_url"} 124 1608124661166 gauge_metric_name{foo="bar",remote_name="156955",url="http://another_url"} 245 1608124662166` - assert.Equal(t, expected, body) - assert.Equal(t, "application/vnd.sumologic.prometheus", req.Header.Get("Content-Type")) + assert.Equal(t, expected, body) + assert.Equal(t, "application/vnd.sumologic.prometheus", req.Header.Get("Content-Type")) + }, + }, + metricFunc: func() pmetric.Metrics { + metricSum, attrsSum := exampleIntMetric() + metricGauge, attrsGauge := exampleIntGaugeMetric() + metrics := metricPairToMetrics( + metricPair{ + attributes: attrsSum, + metric: metricSum, + }, + metricPair{ + attributes: attrsGauge, + metric: metricGauge, + }, + ) + return metrics + }, + expectedError: "failed sending data: status: 500 Internal Server Error", }, - }) - defer func() { test.srv.Close() }() - test.exp.config.MetricFormat = PrometheusFormat - test.exp.config.MaxRequestBodySize = 1 - - records := []metricPair{ - exampleIntMetric(), - exampleIntGaugeMetric(), } - metrics := metricPairToMetrics(records) - expected := metricPairToMetrics(records[:1]) - err := test.exp.pushMetricsData(context.Background(), metrics) - assert.EqualError(t, err, "failed sending data: status: 500 Internal Server Error") - - var partial consumererror.Metrics - require.True(t, errors.As(err, &partial)) - assert.Equal(t, expected, partial.Data()) -} - -func TestPushMetricsInvalidCompressor(t *testing.T) { - test := prepareSenderTest(t, []func(w http.ResponseWriter, req *http.Request){ - func(_ http.ResponseWriter, req *http.Request) { - body := extractBody(t, req) - assert.Equal(t, `Example log`, body) - assert.Equal(t, "", req.Header.Get("X-Sumo-Fields")) - }, - }) - defer func() { test.srv.Close() }() + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + test := prepareExporterTest(t, createTestConfig(), tc.callbacks) + test.exp.config.MetricFormat = PrometheusFormat - metrics := metricPairToMetrics([]metricPair{ - exampleIntMetric(), - exampleIntGaugeMetric(), - }) + metrics := tc.metricFunc() + err := test.exp.pushMetricsData(context.Background(), metrics) - test.exp.config.CompressEncoding = "invalid" + assert.EqualError(t, err, tc.expectedError) - err := test.exp.pushMetricsData(context.Background(), metrics) - assert.EqualError(t, err, "failed to initialize compressor: invalid format: invalid") + var partial consumererror.Metrics + require.True(t, errors.As(err, &partial)) + // TODO fix + // assert.Equal(t, metrics, partial.GetMetrics()) + }) + } } -func TestMetricsDifferentMetadata(t *testing.T) { - test := prepareSenderTest(t, []func(w http.ResponseWriter, req *http.Request){ - func(w http.ResponseWriter, req *http.Request) { - w.WriteHeader(500) - - body := extractBody(t, req) - expected := `test.metric.data{test="test_value",test2="second_value",key1="value1"} 14500 1605534165000` - assert.Equal(t, expected, body) - assert.Equal(t, "application/vnd.sumologic.prometheus", req.Header.Get("Content-Type")) - }, +func TestMetricsPrometheusFormatMetadataFilter(t *testing.T) { + test := prepareExporterTest(t, createTestConfig(), []func(w http.ResponseWriter, req *http.Request){ func(_ http.ResponseWriter, req *http.Request) { body := extractBody(t, req) - expected := `gauge_metric_name{foo="bar",key2="value2",remote_name="156920",url="http://example_url"} 124 1608124661166 -gauge_metric_name{foo="bar",key2="value2",remote_name="156955",url="http://another_url"} 245 1608124662166` + expected := `test.metric.data{test="test_value",test2="second_value",key1="value1",key2="value2"} 14500 1605534165000` assert.Equal(t, expected, body) assert.Equal(t, "application/vnd.sumologic.prometheus", req.Header.Get("Content-Type")) }, }) - defer func() { test.srv.Close() }() test.exp.config.MetricFormat = PrometheusFormat - test.exp.config.MaxRequestBodySize = 1 - f, err := newFilter([]string{`key\d`}) - require.NoError(t, err) - test.exp.filter = f - - records := []metricPair{ - exampleIntMetric(), - exampleIntGaugeMetric(), - } + metrics := metricAndAttributesToPdataMetrics(exampleIntMetric()) - records[0].attributes.PutStr("key1", "value1") - records[1].attributes.PutStr("key2", "value2") + attrs := metrics.ResourceMetrics().At(0).Resource().Attributes() + attrs.PutStr("key1", "value1") + attrs.PutStr("key2", "value2") - metrics := metricPairToMetrics(records) - expected := metricPairToMetrics(records[:1]) - - err = test.exp.pushMetricsData(context.Background(), metrics) - assert.EqualError(t, err, "failed sending data: status: 500 Internal Server Error") - - var partial consumererror.Metrics - require.True(t, errors.As(err, &partial)) - assert.Equal(t, expected, partial.Data()) -} - -func TestPushMetricsFailedBatch(t *testing.T) { - t.Skip("Skip test due to prometheus format complexity. Execution can take over 30s") - test := prepareSenderTest(t, []func(w http.ResponseWriter, req *http.Request){ - func(w http.ResponseWriter, req *http.Request) { - w.WriteHeader(500) - body := extractBody(t, req) - - expected := fmt.Sprintf( - "%s%s", - strings.Repeat("test_metric_data{test=\"test_value\",test2=\"second_value\"} 14500 1605534165000\n", maxBufferSize-1), - `test_metric_data{test="test_value",test2="second_value"} 14500 1605534165000`, - ) - - assert.Equal(t, expected, body) - }, - func(w http.ResponseWriter, req *http.Request) { - w.WriteHeader(200) - body := extractBody(t, req) - - assert.Equal(t, `test_metric_data{test="test_value",test2="second_value"} 14500 1605534165000`, body) - }, - }) - defer func() { test.srv.Close() }() - test.exp.config.MetricFormat = PrometheusFormat - test.exp.config.MaxRequestBodySize = 1024 * 1024 * 1024 * 1024 - - metrics := metricPairToMetrics([]metricPair{exampleIntMetric()}) - metrics.ResourceMetrics().EnsureCapacity(maxBufferSize + 1) - metric := metrics.ResourceMetrics().At(0) - - for i := 0; i < maxBufferSize; i++ { - tgt := metrics.ResourceMetrics().AppendEmpty() - metric.CopyTo(tgt) - } + metrics.MarkReadOnly() err := test.exp.pushMetricsData(context.Background(), metrics) - assert.EqualError(t, err, "failed sending data: status: 500 Internal Server Error") + assert.NoError(t, err) } func TestGetSignalUrl(t *testing.T) { diff --git a/exporter/sumologicexporter/factory.go b/exporter/sumologicexporter/factory.go index dadb64969fea..47ab3195ae1b 100644 --- a/exporter/sumologicexporter/factory.go +++ b/exporter/sumologicexporter/factory.go @@ -41,7 +41,6 @@ func createDefaultConfig() component.Config { SourceName: DefaultSourceName, SourceHost: DefaultSourceHost, Client: DefaultClient, - GraphiteTemplate: DefaultGraphiteTemplate, ClientConfig: createDefaultClientConfig(), BackOffConfig: configretry.NewDefaultBackOffConfig(), diff --git a/exporter/sumologicexporter/factory_test.go b/exporter/sumologicexporter/factory_test.go index 2b5b586503d5..c2d8b0e610e8 100644 --- a/exporter/sumologicexporter/factory_test.go +++ b/exporter/sumologicexporter/factory_test.go @@ -34,12 +34,11 @@ func TestCreateDefaultConfig(t *testing.T) { CompressEncoding: "gzip", MaxRequestBodySize: 1_048_576, LogFormat: "json", - MetricFormat: "prometheus", + MetricFormat: "otlp", SourceCategory: "", SourceName: "", SourceHost: "", Client: "otelcol", - GraphiteTemplate: "%{_metric_}", ClientConfig: confighttp.ClientConfig{ Auth: &configauth.Authentication{ diff --git a/exporter/sumologicexporter/fields.go b/exporter/sumologicexporter/fields.go index f468ff24ea11..410059dabb6a 100644 --- a/exporter/sumologicexporter/fields.go +++ b/exporter/sumologicexporter/fields.go @@ -23,6 +23,10 @@ func newFields(attrMap pcommon.Map) fields { } } +func (f fields) isInitialized() bool { + return f.initialized +} + // string returns fields as ordered key=value string with `, ` as separator func (f fields) string() string { if !f.initialized { diff --git a/exporter/sumologicexporter/graphite_formatter.go b/exporter/sumologicexporter/graphite_formatter.go deleted file mode 100644 index 3d3c605834ef..000000000000 --- a/exporter/sumologicexporter/graphite_formatter.go +++ /dev/null @@ -1,115 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package sumologicexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/sumologicexporter" - -import ( - "fmt" - "regexp" - "strings" - "time" - - "go.opentelemetry.io/collector/pdata/pcommon" - "go.opentelemetry.io/collector/pdata/pmetric" -) - -type graphiteFormatter struct { - template sourceFormat - replacer *strings.Replacer -} - -const ( - graphiteMetricNamePlaceholder = "_metric_" -) - -// newGraphiteFormatter creates new formatter for given SourceFormat template -func newGraphiteFormatter(template string) graphiteFormatter { - r := regexp.MustCompile(sourceRegex) - - sf := newSourceFormat(r, template) - - return graphiteFormatter{ - template: sf, - replacer: strings.NewReplacer(`.`, `_`, ` `, `_`), - } -} - -// escapeGraphiteString replaces dot and space using replacer, -// as dot is special character for graphite format -func (gf *graphiteFormatter) escapeGraphiteString(value string) string { - return gf.replacer.Replace(value) -} - -// format returns metric name basing on template for given fields nas metric name -func (gf *graphiteFormatter) format(f fields, metricName string) string { - s := gf.template - labels := make([]any, 0, len(s.matches)) - - for _, matchset := range s.matches { - if matchset == graphiteMetricNamePlaceholder { - labels = append(labels, gf.escapeGraphiteString(metricName)) - } else { - attr, ok := f.orig.Get(matchset) - var value string - if ok { - value = attr.AsString() - } else { - value = "" - } - labels = append(labels, gf.escapeGraphiteString(value)) - } - } - - return fmt.Sprintf(s.template, labels...) -} - -// numberRecord converts NumberDataPoint to graphite metric string -// with additional information from fields -func (gf *graphiteFormatter) numberRecord(fs fields, name string, dataPoint pmetric.NumberDataPoint) string { - switch dataPoint.ValueType() { - case pmetric.NumberDataPointValueTypeDouble: - return fmt.Sprintf("%s %g %d", - gf.format(fs, name), - dataPoint.DoubleValue(), - dataPoint.Timestamp()/pcommon.Timestamp(time.Second), - ) - case pmetric.NumberDataPointValueTypeInt: - return fmt.Sprintf("%s %d %d", - gf.format(fs, name), - dataPoint.IntValue(), - dataPoint.Timestamp()/pcommon.Timestamp(time.Second), - ) - case pmetric.NumberDataPointValueTypeEmpty: - return "" - } - return "" -} - -// metric2String returns stringified metricPair -func (gf *graphiteFormatter) metric2String(record metricPair) string { - var nextLines []string - fs := newFields(record.attributes) - name := record.metric.Name() - //exhaustive:enforce - switch record.metric.Type() { - case pmetric.MetricTypeGauge: - dps := record.metric.Gauge().DataPoints() - nextLines = make([]string, 0, dps.Len()) - for i := 0; i < dps.Len(); i++ { - nextLines = append(nextLines, gf.numberRecord(fs, name, dps.At(i))) - } - case pmetric.MetricTypeSum: - dps := record.metric.Sum().DataPoints() - nextLines = make([]string, 0, dps.Len()) - for i := 0; i < dps.Len(); i++ { - nextLines = append(nextLines, gf.numberRecord(fs, name, dps.At(i))) - } - // Skip complex metrics - case pmetric.MetricTypeHistogram: - case pmetric.MetricTypeSummary: - case pmetric.MetricTypeEmpty: - case pmetric.MetricTypeExponentialHistogram: - } - - return strings.Join(nextLines, "\n") -} diff --git a/exporter/sumologicexporter/graphite_formatter_test.go b/exporter/sumologicexporter/graphite_formatter_test.go deleted file mode 100644 index 0b2d27971415..000000000000 --- a/exporter/sumologicexporter/graphite_formatter_test.go +++ /dev/null @@ -1,137 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package sumologicexporter - -import ( - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestEscapeGraphiteString(t *testing.T) { - gf := newGraphiteFormatter("%{k8s.cluster}.%{k8s.namespace}.%{k8s.pod}.%{_metric_}") - - value := gf.escapeGraphiteString("this.is_example&metric.value") - expected := "this_is_example&metric_value" - - assert.Equal(t, expected, value) -} - -func TestGraphiteFormat(t *testing.T) { - gf := newGraphiteFormatter("%{k8s.cluster}.%{k8s.namespace}.%{k8s.pod}.%{_metric_}") - - fs := fieldsFromMap(map[string]string{ - "k8s.cluster": "test_cluster", - "k8s.namespace": "sumologic", - "k8s.pod": "example_pod", - }) - - expected := "test_cluster.sumologic.example_pod.test_metric" - result := gf.format(fs, "test_metric") - - assert.Equal(t, expected, result) -} - -func TestGraphiteMetricTypeIntGauge(t *testing.T) { - gf := newGraphiteFormatter("%{cluster}.%{namespace}.%{pod}.%{_metric_}") - - metric := exampleIntGaugeMetric() - metric.attributes.PutStr("cluster", "my_cluster") - metric.attributes.PutStr("namespace", "default") - metric.attributes.PutStr("pod", "some pod") - - result := gf.metric2String(metric) - expected := `my_cluster.default.some_pod.gauge_metric_name 124 1608124661 -my_cluster.default.some_pod.gauge_metric_name 245 1608124662` - assert.Equal(t, expected, result) -} - -func TestGraphiteMetricTypeDoubleGauge(t *testing.T) { - gf := newGraphiteFormatter("%{cluster}.%{namespace}.%{pod}.%{_metric_}") - - metric := exampleDoubleGaugeMetric() - metric.attributes.PutStr("cluster", "my_cluster") - metric.attributes.PutStr("namespace", "default") - metric.attributes.PutStr("pod", "some pod") - - result := gf.metric2String(metric) - expected := `my_cluster.default.some_pod.gauge_metric_name_double_test 33.4 1608124661 -my_cluster.default.some_pod.gauge_metric_name_double_test 56.8 1608124662` - assert.Equal(t, expected, result) -} - -func TestGraphiteNoattribute(t *testing.T) { - gf := newGraphiteFormatter("%{cluster}.%{namespace}.%{pod}.%{_metric_}") - - metric := exampleDoubleGaugeMetric() - metric.attributes.PutStr("cluster", "my_cluster") - metric.attributes.PutStr("pod", "some pod") - - result := gf.metric2String(metric) - expected := `my_cluster..some_pod.gauge_metric_name_double_test 33.4 1608124661 -my_cluster..some_pod.gauge_metric_name_double_test 56.8 1608124662` - assert.Equal(t, expected, result) -} - -func TestGraphiteMetricTypeIntSum(t *testing.T) { - gf := newGraphiteFormatter("%{cluster}.%{namespace}.%{pod}.%{_metric_}") - - metric := exampleIntSumMetric() - metric.attributes.PutStr("cluster", "my_cluster") - metric.attributes.PutStr("namespace", "default") - metric.attributes.PutStr("pod", "some pod") - - result := gf.metric2String(metric) - expected := `my_cluster.default.some_pod.sum_metric_int_test 45 1608124444 -my_cluster.default.some_pod.sum_metric_int_test 1238 1608124699` - assert.Equal(t, expected, result) -} - -func TestGraphiteMetricTypeDoubleSum(t *testing.T) { - gf := newGraphiteFormatter("%{cluster}.%{namespace}.%{pod}.%{_metric_}") - - metric := exampleDoubleSumMetric() - metric.attributes.PutStr("cluster", "my_cluster") - metric.attributes.PutStr("namespace", "default") - metric.attributes.PutStr("pod", "some pod") - - result := gf.metric2String(metric) - expected := `my_cluster.default.some_pod.sum_metric_double_test 45.6 1618124444 -my_cluster.default.some_pod.sum_metric_double_test 1238.1 1608424699` - assert.Equal(t, expected, result) -} - -func TestGraphiteMetricTypeSummary(t *testing.T) { - gf := newGraphiteFormatter("%{cluster}.%{namespace}.%{pod}.%{_metric_}") - - metric := exampleSummaryMetric() - metric.attributes.PutStr("cluster", "my_cluster") - metric.attributes.PutStr("namespace", "default") - metric.attributes.PutStr("pod", "some pod") - - result := gf.metric2String(metric) - expected := `` - assert.Equal(t, expected, result) - - metric = buildExampleSummaryMetric(false) - result = gf.metric2String(metric) - assert.Equal(t, expected, result) -} - -func TestGraphiteMetricTypeHistogram(t *testing.T) { - gf := newGraphiteFormatter("%{cluster}.%{namespace}.%{pod}.%{_metric_}") - - metric := exampleHistogramMetric() - metric.attributes.PutStr("cluster", "my_cluster") - metric.attributes.PutStr("namespace", "default") - metric.attributes.PutStr("pod", "some pod") - - result := gf.metric2String(metric) - expected := `` - assert.Equal(t, expected, result) - - metric = buildExampleHistogramMetric(false) - result = gf.metric2String(metric) - assert.Equal(t, expected, result) -} diff --git a/exporter/sumologicexporter/otlp_test.go b/exporter/sumologicexporter/otlp_test.go index f80395efdece..59ced8679518 100644 --- a/exporter/sumologicexporter/otlp_test.go +++ b/exporter/sumologicexporter/otlp_test.go @@ -18,9 +18,7 @@ const ( ) func TestHistogramDecomposeNoHistogram(t *testing.T) { - mp := exampleIntGaugeMetric() - metric := mp.metric - resourceAttributes := mp.attributes + metric, resourceAttributes := exampleIntGaugeMetric() metrics := pmetric.NewMetrics() resourceAttributes.CopyTo(metrics.ResourceMetrics().AppendEmpty().Resource().Attributes()) metric.MoveTo(metrics.ResourceMetrics().At(0).ScopeMetrics().AppendEmpty().Metrics().AppendEmpty()) diff --git a/exporter/sumologicexporter/prometheus_formatter_test.go b/exporter/sumologicexporter/prometheus_formatter_test.go index 3a01bb711355..dbb593df5e62 100644 --- a/exporter/sumologicexporter/prometheus_formatter_test.go +++ b/exporter/sumologicexporter/prometheus_formatter_test.go @@ -8,6 +8,7 @@ import ( "github.com/stretchr/testify/assert" "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" ) func TestSanitizeKey(t *testing.T) { @@ -31,16 +32,16 @@ func TestSanitizeValue(t *testing.T) { func TestTags2StringNoLabels(t *testing.T) { f := newPrometheusFormatter() - mp := exampleIntMetric() - mp.attributes.Clear() - assert.Equal(t, prometheusTags(""), f.tags2String(mp.attributes, pcommon.NewMap())) + _, attributes := exampleIntMetric() + attributes.Clear() + assert.Equal(t, prometheusTags(""), f.tags2String(attributes, pcommon.NewMap())) } func TestTags2String(t *testing.T) { f := newPrometheusFormatter() - mp := exampleIntMetric() - mp.attributes.PutInt("int", 200) + _, attributes := exampleIntMetric() + attributes.PutInt("int", 200) labels := pcommon.NewMap() labels.PutInt("l_int", 200) @@ -49,23 +50,23 @@ func TestTags2String(t *testing.T) { assert.Equal( t, prometheusTags(`{test="test_value",test2="second_value",int="200",l_int="200",l_str="two"}`), - f.tags2String(mp.attributes, labels), + f.tags2String(attributes, labels), ) } func TestTags2StringNoAttributes(t *testing.T) { f := newPrometheusFormatter() - mp := exampleIntMetric() - mp.attributes.Clear() + _, attributes := exampleIntMetric() + attributes.Clear() assert.Equal(t, prometheusTags(""), f.tags2String(pcommon.NewMap(), pcommon.NewMap())) } func TestPrometheusMetricDataTypeIntGauge(t *testing.T) { f := newPrometheusFormatter() - mp := exampleIntGaugeMetric() + metric, attributes := exampleIntGaugeMetric() - result := f.metric2String(mp.metric, mp.attributes) + result := f.metric2String(metric, attributes) expected := `gauge_metric_name{foo="bar",remote_name="156920",url="http://example_url"} 124 1608124661166 gauge_metric_name{foo="bar",remote_name="156955",url="http://another_url"} 245 1608124662166` assert.Equal(t, expected, result) @@ -73,9 +74,9 @@ gauge_metric_name{foo="bar",remote_name="156955",url="http://another_url"} 245 1 func TestPrometheusMetricDataTypeDoubleGauge(t *testing.T) { f := newPrometheusFormatter() - mp := exampleDoubleGaugeMetric() + metric, attributes := exampleDoubleGaugeMetric() - result := f.metric2String(mp.metric, mp.attributes) + result := f.metric2String(metric, attributes) expected := `gauge_metric_name_double_test{foo="bar",local_name="156720",endpoint="http://example_url"} 33.4 1608124661169 gauge_metric_name_double_test{foo="bar",local_name="156155",endpoint="http://another_url"} 56.8 1608124662186` assert.Equal(t, expected, result) @@ -83,9 +84,9 @@ gauge_metric_name_double_test{foo="bar",local_name="156155",endpoint="http://ano func TestPrometheusMetricDataTypeIntSum(t *testing.T) { f := newPrometheusFormatter() - mp := exampleIntSumMetric() + metric, attributes := exampleIntSumMetric() - result := f.metric2String(mp.metric, mp.attributes) + result := f.metric2String(metric, attributes) expected := `sum_metric_int_test{foo="bar",name="156720",address="http://example_url"} 45 1608124444169 sum_metric_int_test{foo="bar",name="156155",address="http://another_url"} 1238 1608124699186` assert.Equal(t, expected, result) @@ -93,9 +94,9 @@ sum_metric_int_test{foo="bar",name="156155",address="http://another_url"} 1238 1 func TestPrometheusMetricDataTypeDoubleSum(t *testing.T) { f := newPrometheusFormatter() - mp := exampleDoubleSumMetric() + metric, attributes := exampleDoubleSumMetric() - result := f.metric2String(mp.metric, mp.attributes) + result := f.metric2String(metric, attributes) expected := `sum_metric_double_test{foo="bar",pod_name="lorem",namespace="default"} 45.6 1618124444169 sum_metric_double_test{foo="bar",pod_name="opsum",namespace="kube-config"} 1238.1 1608424699186` assert.Equal(t, expected, result) @@ -103,9 +104,9 @@ sum_metric_double_test{foo="bar",pod_name="opsum",namespace="kube-config"} 1238. func TestPrometheusMetricDataTypeSummary(t *testing.T) { f := newPrometheusFormatter() - mp := exampleSummaryMetric() + metric, attributes := exampleSummaryMetric() - result := f.metric2String(mp.metric, mp.attributes) + result := f.metric2String(metric, attributes) expected := `summary_metric_double_test{foo="bar",quantile="0.6",pod_name="dolor",namespace="sumologic"} 0.7 1618124444169 summary_metric_double_test{foo="bar",quantile="2.6",pod_name="dolor",namespace="sumologic"} 4 1618124444169 summary_metric_double_test_sum{foo="bar",pod_name="dolor",namespace="sumologic"} 45.6 1618124444169 @@ -117,9 +118,9 @@ summary_metric_double_test_count{foo="bar",pod_name="sit",namespace="main"} 7 16 func TestPrometheusMetricDataTypeHistogram(t *testing.T) { f := newPrometheusFormatter() - mp := exampleHistogramMetric() + metric, attributes := exampleHistogramMetric() - result := f.metric2String(mp.metric, mp.attributes) + result := f.metric2String(metric, attributes) expected := `histogram_metric_double_test_bucket{bar="foo",le="0.1",container="dolor",branch="sumologic"} 0 1618124444169 histogram_metric_double_test_bucket{bar="foo",le="0.2",container="dolor",branch="sumologic"} 12 1618124444169 histogram_metric_double_test_bucket{bar="foo",le="0.5",container="dolor",branch="sumologic"} 19 1618124444169 @@ -142,7 +143,7 @@ histogram_metric_double_test_count{bar="foo",container="sit",branch="main"} 98 1 func TestEmptyPrometheusMetrics(t *testing.T) { type testCase struct { name string - metricFunc func(fillData bool) metricPair + metricFunc func(fillData bool) (pmetric.Metric, pcommon.Map) expected string } @@ -183,8 +184,7 @@ func TestEmptyPrometheusMetrics(t *testing.T) { t.Run(tt.name, func(t *testing.T) { f := newPrometheusFormatter() - mp := tt.metricFunc(false) - result := f.metric2String(mp.metric, mp.attributes) + result := f.metric2String(tt.metricFunc(false)) assert.Equal(t, tt.expected, result) }) } @@ -193,10 +193,10 @@ func TestEmptyPrometheusMetrics(t *testing.T) { func Benchmark_PrometheusFormatter_Metric2String(b *testing.B) { f := newPrometheusFormatter() - mp := buildExampleHistogramMetric(true) + metric, attributes := buildExampleHistogramMetric(true) b.ResetTimer() for i := 0; i < b.N; i++ { - _ = f.metric2String(mp.metric, mp.attributes) + _ = f.metric2String(metric, attributes) } } diff --git a/exporter/sumologicexporter/sender.go b/exporter/sumologicexporter/sender.go index b1ea5f70d332..9e7e6d132b14 100644 --- a/exporter/sumologicexporter/sender.go +++ b/exporter/sumologicexporter/sender.go @@ -11,6 +11,7 @@ import ( "fmt" "io" "net/http" + "reflect" "strings" "time" @@ -24,6 +25,10 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/sumologicexporter/internal/observability" ) +var ( + metricsMarshaler = pmetric.ProtoMarshaler{} +) + // metricPair represents information required to send one metric to the Sumo Logic type metricPair struct { attributes pcommon.Map @@ -43,6 +48,12 @@ func newCountingReader(records int) *countingReader { } } +// withBytes sets up reader to read from bytes data +func (c *countingReader) withBytes(data []byte) *countingReader { + c.reader = bytes.NewReader(data) + return c +} + // withString sets up reader to read from string data func (c *countingReader) withString(data string) *countingReader { c.reader = strings.NewReader(data) @@ -100,7 +111,6 @@ func (b *bodyBuilder) toCountingReader() *countingReader { type sender struct { logger *zap.Logger logBuffer []plog.LogRecord - metricBuffer []metricPair config *Config client *http.Client filter filter @@ -110,7 +120,6 @@ type sender struct { dataURLMetrics string dataURLLogs string dataURLTraces string - graphiteFormatter graphiteFormatter id component.ID } @@ -133,8 +142,7 @@ const ( contentTypeLogs string = "application/x-www-form-urlencoded" contentTypePrometheus string = "application/vnd.sumologic.prometheus" - contentTypeCarbon2 string = "application/vnd.sumologic.carbon2" - contentTypeGraphite string = "application/vnd.sumologic.graphite" + contentTypeOTLP string = "application/x-protobuf" contentEncodingGzip string = "gzip" contentEncodingDeflate string = "deflate" @@ -151,7 +159,6 @@ func newSender( metricsURL string, logsURL string, tracesURL string, - gf graphiteFormatter, id component.ID, ) *sender { return &sender{ @@ -165,7 +172,6 @@ func newSender( dataURLMetrics: metricsURL, dataURLLogs: logsURL, dataURLTraces: tracesURL, - graphiteFormatter: gf, id: id, } } @@ -304,17 +310,18 @@ func (s *sender) handleReceiverResponse(resp *http.Response) error { func (s *sender) createRequest(ctx context.Context, pipeline PipelineType, data io.Reader) (*http.Request, error) { var url string + var err error switch pipeline { case MetricsPipeline: url = s.dataURLMetrics case LogsPipeline: url = s.dataURLLogs + data, err = s.compressor.compress(data) default: return nil, fmt.Errorf("unknown pipeline type: %s", pipeline) } - data, err := s.compressor.compress(data) if err != nil { return nil, err } @@ -394,60 +401,117 @@ func (s *sender) sendLogs(ctx context.Context, flds fields) ([]plog.LogRecord, e return droppedRecords, errors.Join(errs...) } -// sendMetrics sends metrics in right format basing on the s.config.MetricFormat -func (s *sender) sendMetrics(ctx context.Context, flds fields) ([]metricPair, error) { +// sendNonOTLPMetrics sends metrics in right format basing on the s.config.MetricFormat +func (s *sender) sendNonOTLPMetrics(ctx context.Context, md pmetric.Metrics) (pmetric.Metrics, []error) { + if s.config.MetricFormat == OTLPMetricFormat { + return md, []error{fmt.Errorf("attempting to send OTLP metrics as non-OTLP data")} + } + var ( - body = newBodyBuilder() - errs []error - droppedRecords []metricPair - currentRecords []metricPair + body = newBodyBuilder() + errs []error + currentResources []pmetric.ResourceMetrics + flds fields ) - for _, record := range s.metricBuffer { - var formattedLine string + rms := md.ResourceMetrics() + droppedMetrics := pmetric.NewMetrics() + for i := 0; i < rms.Len(); i++ { + rm := rms.At(i) + flds = newFields(rm.Resource().Attributes()) + sms := rm.ScopeMetrics() + + // generally speaking, it's fine to send multiple ResourceMetrics in a single request + // the only exception is if the computed source headers are different, as those as unique per-request + // so we check if the headers are different here and send what we have if they are + if i > 0 { + currentSourceHeaders := getSourcesHeaders(flds) + previousFields := newFields(rms.At(i - 1).Resource().Attributes()) + previousSourceHeaders := getSourcesHeaders(previousFields) + if !reflect.DeepEqual(previousSourceHeaders, currentSourceHeaders) && body.Len() > 0 { + + if err := s.send(ctx, MetricsPipeline, body.toCountingReader(), previousFields); err != nil { + errs = append(errs, err) + for _, resource := range currentResources { + resource.CopyTo(droppedMetrics.ResourceMetrics().AppendEmpty()) + } + } + body.Reset() + currentResources = currentResources[:0] + } + } + + // transform the metrics into formatted lines ready to be sent + var formattedLines []string var err error + for i := 0; i < sms.Len(); i++ { + sm := sms.At(i) - switch s.config.MetricFormat { - case PrometheusFormat: - formattedLine = s.prometheusFormatter.metric2String(record.metric, record.attributes) - case Carbon2Format: - formattedLine = carbon2Metric2String(record) - case GraphiteFormat: - formattedLine = s.graphiteFormatter.metric2String(record) - default: - err = fmt.Errorf("unexpected metric format: %s", s.config.MetricFormat) - } + for j := 0; j < sm.Metrics().Len(); j++ { + m := sm.Metrics().At(j) - if err != nil { - droppedRecords = append(droppedRecords, record) - errs = append(errs, err) - continue + var formattedLine string + + switch s.config.MetricFormat { + case PrometheusFormat: + formattedLine = s.prometheusFormatter.metric2String(m, rm.Resource().Attributes()) + default: + return md, []error{fmt.Errorf("unexpected metric format: %s", s.config.MetricFormat)} + } + + formattedLines = append(formattedLines, formattedLine) + } } - sent, err := s.appendAndMaybeSend(ctx, []string{formattedLine}, MetricsPipeline, &body, flds) + sent, err := s.appendAndMaybeSend(ctx, formattedLines, MetricsPipeline, &body, flds) if err != nil { errs = append(errs, err) if sent { - droppedRecords = append(droppedRecords, currentRecords...) + // failed at sending, add the resource to the dropped metrics + // move instead of copy here to avoid duplicating data in memory on failure + for _, resource := range currentResources { + resource.CopyTo(droppedMetrics.ResourceMetrics().AppendEmpty()) + } } } - // If data was sent, cleanup the currentTimeSeries counter + // If data was sent, cleanup the currentResources slice if sent { - currentRecords = currentRecords[:0] + currentResources = currentResources[:0] } - currentRecords = append(currentRecords, record) + currentResources = append(currentResources, rm) + } if body.Len() > 0 { if err := s.send(ctx, MetricsPipeline, body.toCountingReader(), flds); err != nil { errs = append(errs, err) - droppedRecords = append(droppedRecords, currentRecords...) + for _, resource := range currentResources { + resource.CopyTo(droppedMetrics.ResourceMetrics().AppendEmpty()) + } } } - return droppedRecords, errors.Join(errs...) + return droppedMetrics, errs +} + +func (s *sender) sendOTLPMetrics(ctx context.Context, md pmetric.Metrics) error { + rms := md.ResourceMetrics() + if rms.Len() == 0 { + s.logger.Debug("there are no metrics to send, moving on") + return nil + } + if s.config.DecomposeOtlpHistograms { + md = decomposeHistograms(md) + } + + body, err := metricsMarshaler.MarshalMetrics(md) + if err != nil { + return err + } + + return s.send(ctx, MetricsPipeline, newCountingReader(md.DataPointCount()).withBytes(body), fields{}) } // appendAndMaybeSend appends line to the request body that will be sent and sends @@ -506,30 +570,6 @@ func (s *sender) countLogs() int { return len(s.logBuffer) } -// cleanMetricBuffer zeroes metricBuffer -func (s *sender) cleanMetricBuffer() { - s.metricBuffer = (s.metricBuffer)[:0] -} - -// batchMetric adds metric to the metricBuffer and flushes them if metricBuffer is full to avoid overflow -// returns list of metric records which were not sent successfully -func (s *sender) batchMetric(ctx context.Context, metric metricPair, metadata fields) ([]metricPair, error) { - s.metricBuffer = append(s.metricBuffer, metric) - - if s.countMetrics() >= maxBufferSize { - dropped, err := s.sendMetrics(ctx, metadata) - s.cleanMetricBuffer() - return dropped, err - } - - return nil, nil -} - -// countMetrics returns number of metrics in metricBuffer -func (s *sender) countMetrics() int { - return len(s.metricBuffer) -} - func (s *sender) addSourcesHeaders(req *http.Request, flds fields) { if s.sources.host.isSet() { req.Header.Add(headerHost, s.sources.host.format(flds)) @@ -542,6 +582,34 @@ func (s *sender) addSourcesHeaders(req *http.Request, flds fields) { if s.sources.category.isSet() { req.Header.Add(headerCategory, s.sources.category.format(flds)) } + + sourceHeaderValues := getSourcesHeaders(flds) + + for headerName, headerValue := range sourceHeaderValues { + req.Header.Add(headerName, headerValue) + } +} + +func getSourcesHeaders(flds fields) map[string]string { + sourceHeaderValues := map[string]string{} + if !flds.isInitialized() { + return sourceHeaderValues + } + + attrs := flds.orig + + if v, ok := attrs.Get(attributeKeySourceHost); ok { + sourceHeaderValues[headerHost] = v.AsString() + } + + if v, ok := attrs.Get(attributeKeySourceName); ok { + sourceHeaderValues[headerName] = v.AsString() + } + + if v, ok := attrs.Get(attributeKeySourceCategory); ok { + sourceHeaderValues[headerCategory] = v.AsString() + } + return sourceHeaderValues } func addLogsHeaders(req *http.Request, _ LogFormatType, flds fields) { @@ -556,10 +624,8 @@ func addMetricsHeaders(req *http.Request, mf MetricFormatType) error { switch mf { case PrometheusFormat: req.Header.Add(headerContentType, contentTypePrometheus) - case Carbon2Format: - req.Header.Add(headerContentType, contentTypeCarbon2) - case GraphiteFormat: - req.Header.Add(headerContentType, contentTypeGraphite) + case OTLPMetricFormat: + req.Header.Add(headerContentType, contentTypeOTLP) default: return fmt.Errorf("unsupported metrics format: %s", mf) } diff --git a/exporter/sumologicexporter/sender_test.go b/exporter/sumologicexporter/sender_test.go index e84b42b38db5..5f42d2421b44 100644 --- a/exporter/sumologicexporter/sender_test.go +++ b/exporter/sumologicexporter/sender_test.go @@ -18,8 +18,10 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/config/confighttp" + "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/pmetric" "go.uber.org/zap" ) @@ -30,18 +32,19 @@ type senderTest struct { } func prepareSenderTest(t *testing.T, cb []func(w http.ResponseWriter, req *http.Request)) *senderTest { - reqCounter := &atomic.Int32{} + var reqCounter int32 // generate a test server so we can capture and inspect the request testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { if len(cb) == 0 { return } - if c := int(reqCounter.Load()); assert.Greater(t, len(cb), c) { + if c := int(atomic.LoadInt32(&reqCounter)); assert.Greater(t, len(cb), c) { cb[c](w, req) - reqCounter.Add(1) + atomic.AddInt32(&reqCounter, 1) } })) + t.Cleanup(func() { testServer.Close() }) cfg := &Config{ ClientConfig: confighttp.ClientConfig{ @@ -49,7 +52,7 @@ func prepareSenderTest(t *testing.T, cb []func(w http.ResponseWriter, req *http. Timeout: defaultTimeout, }, LogFormat: "text", - MetricFormat: "carbon2", + MetricFormat: "otlp", Client: "otelcol", MaxRequestBodySize: 20_971_520, } @@ -64,8 +67,6 @@ func prepareSenderTest(t *testing.T, cb []func(w http.ResponseWriter, req *http. pf := newPrometheusFormatter() - gf := newGraphiteFormatter(DefaultGraphiteTemplate) - err = exp.start(context.Background(), componenttest.NewNopHost()) require.NoError(t, err) @@ -92,7 +93,6 @@ func prepareSenderTest(t *testing.T, cb []func(w http.ResponseWriter, req *http. testServer.URL, testServer.URL, testServer.URL, - gf, component.ID{}, ), } @@ -614,53 +614,125 @@ func TestSendMetrics(t *testing.T) { test := prepareSenderTest(t, []func(w http.ResponseWriter, req *http.Request){ func(_ http.ResponseWriter, req *http.Request) { body := extractBody(t, req) - expected := `test.metric.data{test="test_value",test2="second_value"} 14500 1605534165000 -gauge_metric_name{foo="bar",remote_name="156920",url="http://example_url"} 124 1608124661166 -gauge_metric_name{foo="bar",remote_name="156955",url="http://another_url"} 245 1608124662166` + expected := `` + + `test.metric.data{test="test_value",test2="second_value"} 14500 1605534165000` + "\n" + + `gauge_metric_name{test="test_value",test2="second_value",remote_name="156920",url="http://example_url"} 124 1608124661166` + "\n" + + `gauge_metric_name{test="test_value",test2="second_value",remote_name="156955",url="http://another_url"} 245 1608124662166` assert.Equal(t, expected, body) assert.Equal(t, "otelcol", req.Header.Get("X-Sumo-Client")) assert.Equal(t, "application/vnd.sumologic.prometheus", req.Header.Get("Content-Type")) }, }) - defer func() { test.srv.Close() }() - flds := fieldsFromMap(map[string]string{ - "key1": "value", - "key2": "value2", + + test.s.config.MetricFormat = PrometheusFormat + + metricSum, attrs := exampleIntMetric() + metricGauge, _ := exampleIntGaugeMetric() + metrics := metricAndAttrsToPdataMetrics( + attrs, + metricSum, metricGauge, + ) + metrics.MarkReadOnly() + + _, errs := test.s.sendNonOTLPMetrics(context.Background(), metrics) + assert.Empty(t, errs) +} + +func TestSendMetricsSplit(t *testing.T) { + test := prepareSenderTest(t, []func(w http.ResponseWriter, req *http.Request){ + func(_ http.ResponseWriter, req *http.Request) { + body := extractBody(t, req) + expected := `` + + `test.metric.data{test="test_value",test2="second_value"} 14500 1605534165000` + "\n" + + `gauge_metric_name{test="test_value",test2="second_value",remote_name="156920",url="http://example_url"} 124 1608124661166` + "\n" + + `gauge_metric_name{test="test_value",test2="second_value",remote_name="156955",url="http://another_url"} 245 1608124662166` + assert.Equal(t, expected, body) + assert.Equal(t, "otelcol", req.Header.Get("X-Sumo-Client")) + assert.Equal(t, "application/vnd.sumologic.prometheus", req.Header.Get("Content-Type")) + }, }) test.s.config.MetricFormat = PrometheusFormat - test.s.metricBuffer = []metricPair{ - exampleIntMetric(), - exampleIntGaugeMetric(), - } - _, err := test.s.sendMetrics(context.Background(), flds) + + metricSum, attrs := exampleIntMetric() + metricGauge, _ := exampleIntGaugeMetric() + metrics := metricAndAttrsToPdataMetrics( + attrs, + metricSum, metricGauge, + ) + metrics.MarkReadOnly() + + _, errs := test.s.sendNonOTLPMetrics(context.Background(), metrics) + assert.Empty(t, errs) +} + +func TestSendOTLPHistogram(t *testing.T) { + test := prepareSenderTest(t, []func(w http.ResponseWriter, req *http.Request){ + func(_ http.ResponseWriter, req *http.Request) { + unmarshaler := pmetric.ProtoUnmarshaler{} + body, err := io.ReadAll(req.Body) + require.NoError(t, err) + metrics, err := unmarshaler.UnmarshalMetrics(body) + require.NoError(t, err) + assert.Equal(t, 3, metrics.MetricCount()) + assert.Equal(t, 16, metrics.DataPointCount()) + }, + }) + + defer func() { test.srv.Close() }() + + test.s.config.DecomposeOtlpHistograms = true + test.s.config.MetricFormat = OTLPMetricFormat + + metricHistogram, attrs := exampleHistogramMetric() + + metrics := pmetric.NewMetrics() + + rms := metrics.ResourceMetrics().AppendEmpty() + attrs.CopyTo(rms.Resource().Attributes()) + metricHistogram.CopyTo(rms.ScopeMetrics().AppendEmpty().Metrics().AppendEmpty()) + metrics.MarkReadOnly() + + err := test.s.sendOTLPMetrics(context.Background(), metrics) assert.NoError(t, err) } -func TestSendMetricsSplit(t *testing.T) { +func TestSendMetricsSplitBySource(t *testing.T) { test := prepareSenderTest(t, []func(w http.ResponseWriter, req *http.Request){ func(_ http.ResponseWriter, req *http.Request) { body := extractBody(t, req) - expected := `test.metric.data{test="test_value",test2="second_value"} 14500 1605534165000` + expected := `test.metric.data{test="test_value",test2="second_value",_sourceHost="value1"} 14500 1605534165000` assert.Equal(t, expected, body) }, func(_ http.ResponseWriter, req *http.Request) { body := extractBody(t, req) - expected := `gauge_metric_name{foo="bar",remote_name="156920",url="http://example_url"} 124 1608124661166 -gauge_metric_name{foo="bar",remote_name="156955",url="http://another_url"} 245 1608124662166` + expected := `` + + `gauge_metric_name{test="test_value",test2="second_value",_sourceHost="value2",remote_name="156920",url="http://example_url"} 124 1608124661166` + "\n" + + `gauge_metric_name{test="test_value",test2="second_value",_sourceHost="value2",remote_name="156955",url="http://another_url"} 245 1608124662166` assert.Equal(t, expected, body) }, }) - defer func() { test.srv.Close() }() - test.s.config.MaxRequestBodySize = 10 test.s.config.MetricFormat = PrometheusFormat - test.s.metricBuffer = []metricPair{ - exampleIntMetric(), - exampleIntGaugeMetric(), - } - _, err := test.s.sendMetrics(context.Background(), newFields(pcommon.NewMap())) - assert.NoError(t, err) + metricSum, attrs := exampleIntMetric() + metricGauge, _ := exampleIntGaugeMetric() + + metrics := pmetric.NewMetrics() + metrics.ResourceMetrics().EnsureCapacity(2) + + rmsSum := metrics.ResourceMetrics().AppendEmpty() + attrs.CopyTo(rmsSum.Resource().Attributes()) + rmsSum.Resource().Attributes().PutStr("_sourceHost", "value1") + metricSum.CopyTo(rmsSum.ScopeMetrics().AppendEmpty().Metrics().AppendEmpty()) + + rmsGauge := metrics.ResourceMetrics().AppendEmpty() + attrs.CopyTo(rmsGauge.Resource().Attributes()) + rmsGauge.Resource().Attributes().PutStr("_sourceHost", "value2") + metricGauge.CopyTo(rmsGauge.ScopeMetrics().AppendEmpty().Metrics().AppendEmpty()) + metrics.MarkReadOnly() + + _, errs := test.s.sendNonOTLPMetrics(context.Background(), metrics) + assert.Empty(t, errs) } func TestSendMetricsSplitFailedOne(t *testing.T) { @@ -674,22 +746,34 @@ func TestSendMetricsSplitFailedOne(t *testing.T) { }, func(_ http.ResponseWriter, req *http.Request) { body := extractBody(t, req) - expected := `gauge_metric_name{foo="bar",remote_name="156920",url="http://example_url"} 124 1608124661166 -gauge_metric_name{foo="bar",remote_name="156955",url="http://another_url"} 245 1608124662166` + expected := `` + + `gauge_metric_name{test="test_value",test2="second_value",remote_name="156920",url="http://example_url"} 124 1608124661166` + "\n" + + `gauge_metric_name{test="test_value",test2="second_value",remote_name="156955",url="http://another_url"} 245 1608124662166` assert.Equal(t, expected, body) }, }) - defer func() { test.srv.Close() }() test.s.config.MaxRequestBodySize = 10 test.s.config.MetricFormat = PrometheusFormat - test.s.metricBuffer = []metricPair{ - exampleIntMetric(), - exampleIntGaugeMetric(), - } - dropped, err := test.s.sendMetrics(context.Background(), newFields(pcommon.NewMap())) - assert.EqualError(t, err, "failed sending data: status: 500 Internal Server Error") - assert.Equal(t, test.s.metricBuffer[0:1], dropped) + metricSum, attrs := exampleIntMetric() + metricGauge, _ := exampleIntGaugeMetric() + metrics := pmetric.NewMetrics() + metrics.ResourceMetrics().EnsureCapacity(2) + + rmsSum := metrics.ResourceMetrics().AppendEmpty() + attrs.CopyTo(rmsSum.Resource().Attributes()) + metricSum.CopyTo(rmsSum.ScopeMetrics().AppendEmpty().Metrics().AppendEmpty()) + + rmsGauge := metrics.ResourceMetrics().AppendEmpty() + attrs.CopyTo(rmsGauge.Resource().Attributes()) + metricGauge.CopyTo(rmsGauge.ScopeMetrics().AppendEmpty().Metrics().AppendEmpty()) + metrics.MarkReadOnly() + + dropped, errs := test.s.sendNonOTLPMetrics(context.Background(), metrics) + assert.Len(t, errs, 1) + assert.EqualError(t, errs[0], "failed sending data: status: 500 Internal Server Error") + require.Equal(t, 1, dropped.MetricCount()) + assert.Equal(t, dropped.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0), metricSum) } func TestSendMetricsSplitFailedAll(t *testing.T) { @@ -705,157 +789,70 @@ func TestSendMetricsSplitFailedAll(t *testing.T) { w.WriteHeader(404) body := extractBody(t, req) - expected := `gauge_metric_name{foo="bar",remote_name="156920",url="http://example_url"} 124 1608124661166 -gauge_metric_name{foo="bar",remote_name="156955",url="http://another_url"} 245 1608124662166` + expected := `` + + `gauge_metric_name{test="test_value",test2="second_value",remote_name="156920",url="http://example_url"} 124 1608124661166` + "\n" + + `gauge_metric_name{test="test_value",test2="second_value",remote_name="156955",url="http://another_url"} 245 1608124662166` assert.Equal(t, expected, body) }, }) - defer func() { test.srv.Close() }() test.s.config.MaxRequestBodySize = 10 test.s.config.MetricFormat = PrometheusFormat - test.s.metricBuffer = []metricPair{ - exampleIntMetric(), - exampleIntGaugeMetric(), - } - dropped, err := test.s.sendMetrics(context.Background(), newFields(pcommon.NewMap())) + metricSum, attrs := exampleIntMetric() + metricGauge, _ := exampleIntGaugeMetric() + metrics := pmetric.NewMetrics() + metrics.ResourceMetrics().EnsureCapacity(2) + + rmsSum := metrics.ResourceMetrics().AppendEmpty() + attrs.CopyTo(rmsSum.Resource().Attributes()) + metricSum.CopyTo(rmsSum.ScopeMetrics().AppendEmpty().Metrics().AppendEmpty()) + + rmsGauge := metrics.ResourceMetrics().AppendEmpty() + attrs.CopyTo(rmsGauge.Resource().Attributes()) + metricGauge.CopyTo(rmsGauge.ScopeMetrics().AppendEmpty().Metrics().AppendEmpty()) + metrics.MarkReadOnly() + + dropped, errs := test.s.sendNonOTLPMetrics(context.Background(), metrics) + assert.Len(t, errs, 2) assert.EqualError( t, - err, - "failed sending data: status: 500 Internal Server Error\nfailed sending data: status: 404 Not Found", + errs[0], + "failed sending data: status: 500 Internal Server Error", ) - assert.Equal(t, test.s.metricBuffer[0:2], dropped) + assert.EqualError( + t, + errs[1], + "failed sending data: status: 404 Not Found", + ) + require.Equal(t, 2, dropped.MetricCount()) + assert.Equal(t, dropped.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0), metricSum) + assert.Equal(t, dropped.ResourceMetrics().At(1).ScopeMetrics().At(0).Metrics().At(0), metricGauge) } func TestSendMetricsUnexpectedFormat(t *testing.T) { - test := prepareSenderTest(t, []func(w http.ResponseWriter, req *http.Request){ - func(_ http.ResponseWriter, _ *http.Request) { - }, - }) - defer func() { test.srv.Close() }() + // Expect no requestes + test := prepareSenderTest(t, nil) test.s.config.MetricFormat = "invalid" - metrics := []metricPair{ - exampleIntMetric(), - } - test.s.metricBuffer = metrics - - dropped, err := test.s.sendMetrics(context.Background(), newFields(pcommon.NewMap())) - assert.EqualError(t, err, "unexpected metric format: invalid") - assert.Equal(t, dropped, metrics) -} - -func TestMetricsBuffer(t *testing.T) { - test := prepareSenderTest(t, []func(w http.ResponseWriter, req *http.Request){}) - defer func() { test.srv.Close() }() - - assert.Equal(t, test.s.countMetrics(), 0) - metrics := []metricPair{ - exampleIntMetric(), - exampleIntGaugeMetric(), - } - - droppedMetrics, err := test.s.batchMetric(context.Background(), metrics[0], newFields(pcommon.NewMap())) - require.NoError(t, err) - assert.Nil(t, droppedMetrics) - assert.Equal(t, 1, test.s.countMetrics()) - assert.Equal(t, metrics[0:1], test.s.metricBuffer) - - droppedMetrics, err = test.s.batchMetric(context.Background(), metrics[1], newFields(pcommon.NewMap())) - require.NoError(t, err) - assert.Nil(t, droppedMetrics) - assert.Equal(t, 2, test.s.countMetrics()) - assert.Equal(t, metrics, test.s.metricBuffer) - - test.s.cleanMetricBuffer() - assert.Equal(t, 0, test.s.countMetrics()) - assert.Equal(t, []metricPair{}, test.s.metricBuffer) -} - -func TestMetricsBufferOverflow(t *testing.T) { - t.Skip("Skip test due to prometheus format complexity. Execution can take over 30s") - test := prepareSenderTest(t, []func(w http.ResponseWriter, req *http.Request){}) - defer func() { test.srv.Close() }() - - test.s.config.ClientConfig.Endpoint = ":" - test.s.config.MetricFormat = PrometheusFormat - test.s.config.MaxRequestBodySize = 1024 * 1024 * 1024 * 1024 - metric := exampleIntMetric() - flds := newFields(pcommon.NewMap()) - - for test.s.countMetrics() < maxBufferSize-1 { - _, err := test.s.batchMetric(context.Background(), metric, flds) - require.NoError(t, err) - } - _, err := test.s.batchMetric(context.Background(), metric, flds) - assert.EqualError(t, err, `parse ":": missing protocol scheme`) - assert.Equal(t, 0, test.s.countMetrics()) -} - -func TestSendCarbon2Metrics(t *testing.T) { - test := prepareSenderTest(t, []func(w http.ResponseWriter, req *http.Request){ - func(_ http.ResponseWriter, req *http.Request) { - body := extractBody(t, req) - expected := `test=test_value test2=second_value _unit=m/s escape_me=:invalid_ metric=true metric=test.metric.data unit=bytes 14500 1605534165 -foo=bar metric=gauge_metric_name 124 1608124661 -foo=bar metric=gauge_metric_name 245 1608124662` - assert.Equal(t, expected, body) - assert.Equal(t, "otelcol", req.Header.Get("X-Sumo-Client")) - assert.Equal(t, "application/vnd.sumologic.carbon2", req.Header.Get("Content-Type")) - }, - }) - defer func() { test.srv.Close() }() - - test.s.config.MetricFormat = Carbon2Format - test.s.metricBuffer = []metricPair{ - exampleIntMetric(), - exampleIntGaugeMetric(), - } - - flds := fieldsFromMap(map[string]string{ - "key1": "value", - "key2": "value2", - }) + metricSum, attrs := exampleIntMetric() + metrics := metricAndAttrsToPdataMetrics(attrs, metricSum) + metrics.MarkReadOnly() - test.s.metricBuffer[0].attributes.PutStr("unit", "m/s") - test.s.metricBuffer[0].attributes.PutStr("escape me", "=invalid\n") - test.s.metricBuffer[0].attributes.PutBool("metric", true) - - _, err := test.s.sendMetrics(context.Background(), flds) - assert.NoError(t, err) + dropped, errs := test.s.sendNonOTLPMetrics(context.Background(), metrics) + assert.Len(t, errs, 1) + assert.EqualError(t, errs[0], "unexpected metric format: invalid") + require.Equal(t, 1, dropped.MetricCount()) + assert.Equal(t, dropped, metrics) } -func TestSendGraphiteMetrics(t *testing.T) { +func TestBadRequestCausesPermanentError(t *testing.T) { test := prepareSenderTest(t, []func(w http.ResponseWriter, req *http.Request){ - func(_ http.ResponseWriter, req *http.Request) { - body := extractBody(t, req) - expected := `test_metric_data.true.m/s 14500 1605534165 -gauge_metric_name.. 124 1608124661 -gauge_metric_name.. 245 1608124662` - assert.Equal(t, expected, body) - assert.Equal(t, "otelcol", req.Header.Get("X-Sumo-Client")) - assert.Equal(t, "application/vnd.sumologic.graphite", req.Header.Get("Content-Type")) + func(res http.ResponseWriter, _ *http.Request) { + res.WriteHeader(400) }, }) - defer func() { test.srv.Close() }() + test.s.config.MetricFormat = OTLPMetricFormat - gf := newGraphiteFormatter("%{_metric_}.%{metric}.%{unit}") - test.s.graphiteFormatter = gf - - test.s.config.MetricFormat = GraphiteFormat - test.s.metricBuffer = []metricPair{ - exampleIntMetric(), - exampleIntGaugeMetric(), - } - - flds := fieldsFromMap(map[string]string{ - "key1": "value", - "key2": "value2", - }) - - test.s.metricBuffer[0].attributes.PutStr("unit", "m/s") - test.s.metricBuffer[0].attributes.PutBool("metric", true) - - _, err := test.s.sendMetrics(context.Background(), flds) - assert.NoError(t, err) + err := test.s.send(context.Background(), MetricsPipeline, newCountingReader(0).withString("malformed-request"), fields{}) + assert.True(t, consumererror.IsPermanent(err), "A '400 Bad Request' response from the server should result in a permanent error") } diff --git a/exporter/sumologicexporter/test_data.go b/exporter/sumologicexporter/test_data.go deleted file mode 100644 index 320ef4f0af83..000000000000 --- a/exporter/sumologicexporter/test_data.go +++ /dev/null @@ -1,266 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package sumologicexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/sumologicexporter" - -import ( - "go.opentelemetry.io/collector/pdata/pcommon" - "go.opentelemetry.io/collector/pdata/pmetric" -) - -func exampleIntMetric() metricPair { - return buildExampleIntMetric(true) -} - -func buildExampleIntMetric(fillData bool) metricPair { - metric := pmetric.NewMetric() - metric.SetName("test.metric.data") - metric.SetUnit("bytes") - metric.SetEmptySum() - - if fillData { - dp := metric.Sum().DataPoints().AppendEmpty() - dp.SetTimestamp(1605534165 * 1e9) - dp.SetIntValue(14500) - } - - attributes := pcommon.NewMap() - attributes.PutStr("test", "test_value") - attributes.PutStr("test2", "second_value") - - return metricPair{ - metric: metric, - attributes: attributes, - } -} - -func exampleIntGaugeMetric() metricPair { - return buildExampleIntGaugeMetric(true) -} - -func buildExampleIntGaugeMetric(fillData bool) metricPair { - metric := metricPair{ - attributes: pcommon.NewMap(), - metric: pmetric.NewMetric(), - } - - metric.metric.SetName("gauge_metric_name") - metric.metric.SetEmptyGauge() - - metric.attributes.PutStr("foo", "bar") - - if fillData { - dp := metric.metric.Gauge().DataPoints().AppendEmpty() - dp.Attributes().PutStr("remote_name", "156920") - dp.Attributes().PutStr("url", "http://example_url") - dp.SetIntValue(124) - dp.SetTimestamp(1608124661.166 * 1e9) - - dp = metric.metric.Gauge().DataPoints().AppendEmpty() - dp.Attributes().PutStr("remote_name", "156955") - dp.Attributes().PutStr("url", "http://another_url") - dp.SetIntValue(245) - dp.SetTimestamp(1608124662.166 * 1e9) - } - - return metric -} - -func exampleDoubleGaugeMetric() metricPair { - return buildExampleDoubleGaugeMetric(true) -} - -func buildExampleDoubleGaugeMetric(fillData bool) metricPair { - metric := metricPair{ - attributes: pcommon.NewMap(), - metric: pmetric.NewMetric(), - } - - metric.metric.SetEmptyGauge() - metric.metric.SetName("gauge_metric_name_double_test") - - metric.attributes.PutStr("foo", "bar") - - if fillData { - dp := metric.metric.Gauge().DataPoints().AppendEmpty() - dp.Attributes().PutStr("local_name", "156720") - dp.Attributes().PutStr("endpoint", "http://example_url") - dp.SetDoubleValue(33.4) - dp.SetTimestamp(1608124661.169 * 1e9) - - dp = metric.metric.Gauge().DataPoints().AppendEmpty() - dp.Attributes().PutStr("local_name", "156155") - dp.Attributes().PutStr("endpoint", "http://another_url") - dp.SetDoubleValue(56.8) - dp.SetTimestamp(1608124662.186 * 1e9) - } - - return metric -} - -func exampleIntSumMetric() metricPair { - return buildExampleIntSumMetric(true) -} - -func buildExampleIntSumMetric(fillData bool) metricPair { - metric := metricPair{ - attributes: pcommon.NewMap(), - metric: pmetric.NewMetric(), - } - - metric.metric.SetEmptySum() - metric.metric.SetName("sum_metric_int_test") - - metric.attributes.PutStr("foo", "bar") - - if fillData { - dp := metric.metric.Sum().DataPoints().AppendEmpty() - dp.Attributes().PutStr("name", "156720") - dp.Attributes().PutStr("address", "http://example_url") - dp.SetIntValue(45) - dp.SetTimestamp(1608124444.169 * 1e9) - - dp = metric.metric.Sum().DataPoints().AppendEmpty() - dp.Attributes().PutStr("name", "156155") - dp.Attributes().PutStr("address", "http://another_url") - dp.SetIntValue(1238) - dp.SetTimestamp(1608124699.186 * 1e9) - } - - return metric -} - -func exampleDoubleSumMetric() metricPair { - return buildExampleDoubleSumMetric(true) -} - -func buildExampleDoubleSumMetric(fillData bool) metricPair { - metric := metricPair{ - attributes: pcommon.NewMap(), - metric: pmetric.NewMetric(), - } - - metric.metric.SetEmptySum() - metric.metric.SetName("sum_metric_double_test") - - metric.attributes.PutStr("foo", "bar") - - if fillData { - dp := metric.metric.Sum().DataPoints().AppendEmpty() - dp.Attributes().PutStr("pod_name", "lorem") - dp.Attributes().PutStr("namespace", "default") - dp.SetDoubleValue(45.6) - dp.SetTimestamp(1618124444.169 * 1e9) - - dp = metric.metric.Sum().DataPoints().AppendEmpty() - dp.Attributes().PutStr("pod_name", "opsum") - dp.Attributes().PutStr("namespace", "kube-config") - dp.SetDoubleValue(1238.1) - dp.SetTimestamp(1608424699.186 * 1e9) - } - - return metric -} - -func exampleSummaryMetric() metricPair { - return buildExampleSummaryMetric(true) -} - -func buildExampleSummaryMetric(fillData bool) metricPair { - metric := metricPair{ - attributes: pcommon.NewMap(), - metric: pmetric.NewMetric(), - } - - metric.metric.SetEmptySummary() - metric.metric.SetName("summary_metric_double_test") - - metric.attributes.PutStr("foo", "bar") - - if fillData { - dp := metric.metric.Summary().DataPoints().AppendEmpty() - dp.Attributes().PutStr("pod_name", "dolor") - dp.Attributes().PutStr("namespace", "sumologic") - dp.SetSum(45.6) - dp.SetCount(3) - dp.SetTimestamp(1618124444.169 * 1e9) - - quantile := dp.QuantileValues().AppendEmpty() - quantile.SetQuantile(0.6) - quantile.SetValue(0.7) - - quantile = dp.QuantileValues().AppendEmpty() - quantile.SetQuantile(2.6) - quantile.SetValue(4) - - dp = metric.metric.Summary().DataPoints().AppendEmpty() - dp.Attributes().PutStr("pod_name", "sit") - dp.Attributes().PutStr("namespace", "main") - dp.SetSum(1238.1) - dp.SetCount(7) - dp.SetTimestamp(1608424699.186 * 1e9) - } - - return metric -} - -func exampleHistogramMetric() metricPair { - return buildExampleHistogramMetric(true) -} - -func buildExampleHistogramMetric(fillData bool) metricPair { - metric := metricPair{ - attributes: pcommon.NewMap(), - metric: pmetric.NewMetric(), - } - - metric.metric.SetEmptyHistogram() - metric.metric.SetName("histogram_metric_double_test") - - metric.attributes.PutStr("bar", "foo") - - if fillData { - dp := metric.metric.Histogram().DataPoints().AppendEmpty() - dp.Attributes().PutStr("container", "dolor") - dp.Attributes().PutStr("branch", "sumologic") - dp.BucketCounts().FromRaw([]uint64{0, 12, 7, 5, 8, 13}) - dp.ExplicitBounds().FromRaw([]float64{0.1, 0.2, 0.5, 0.8, 1}) - dp.SetTimestamp(1618124444.169 * 1e9) - dp.SetSum(45.6) - dp.SetCount(7) - - dp = metric.metric.Histogram().DataPoints().AppendEmpty() - dp.Attributes().PutStr("container", "sit") - dp.Attributes().PutStr("branch", "main") - dp.BucketCounts().FromRaw([]uint64{0, 10, 1, 1, 4, 6}) - dp.ExplicitBounds().FromRaw([]float64{0.1, 0.2, 0.5, 0.8, 1}) - dp.SetTimestamp(1608424699.186 * 1e9) - dp.SetSum(54.1) - dp.SetCount(98) - } else { - dp := metric.metric.Histogram().DataPoints().AppendEmpty() - dp.SetCount(0) - } - - return metric -} - -func metricPairToMetrics(mp []metricPair) pmetric.Metrics { - metrics := pmetric.NewMetrics() - metrics.ResourceMetrics().EnsureCapacity(len(mp)) - for num, record := range mp { - record.attributes.CopyTo(metrics.ResourceMetrics().AppendEmpty().Resource().Attributes()) - // TODO: Change metricPair to have an init metric func. - record.metric.CopyTo(metrics.ResourceMetrics().At(num).ScopeMetrics().AppendEmpty().Metrics().AppendEmpty()) - } - - return metrics -} - -func fieldsFromMap(s map[string]string) fields { - attrMap := pcommon.NewMap() - for k, v := range s { - attrMap.PutStr(k, v) - } - return newFields(attrMap) -} diff --git a/exporter/sumologicexporter/test_data_test.go b/exporter/sumologicexporter/test_data_test.go new file mode 100644 index 000000000000..174751288148 --- /dev/null +++ b/exporter/sumologicexporter/test_data_test.go @@ -0,0 +1,292 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package sumologicexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/sumologicexporter" + +import ( + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" +) + +func exampleIntMetric() (pmetric.Metric, pcommon.Map) { + return buildExampleIntMetric(true) +} + +func buildExampleIntMetric(fillData bool) (pmetric.Metric, pcommon.Map) { + metric := pmetric.NewMetric() + metric.SetName("test.metric.data") + metric.SetUnit("bytes") + metric.SetEmptySum() + + if fillData { + dp := metric.Sum().DataPoints().AppendEmpty() + dp.SetTimestamp(1605534165 * 1e9) + dp.SetIntValue(14500) + } + + attributes := pcommon.NewMap() + attributes.PutStr("test", "test_value") + attributes.PutStr("test2", "second_value") + + return metric, attributes +} + +func exampleIntGaugeMetric() (pmetric.Metric, pcommon.Map) { + return buildExampleIntGaugeMetric(true) +} + +func buildExampleIntGaugeMetric(fillData bool) (pmetric.Metric, pcommon.Map) { + attributes := pcommon.NewMap() + metric := pmetric.NewMetric() + + metric.SetEmptyGauge() + metric.SetName("gauge_metric_name") + + attributes.PutStr("foo", "bar") + + if fillData { + dp := metric.Gauge().DataPoints().AppendEmpty() + dp.Attributes().PutStr("remote_name", "156920") + dp.Attributes().PutStr("url", "http://example_url") + dp.SetIntValue(124) + dp.SetTimestamp(1608124661.166 * 1e9) + + dp = metric.Gauge().DataPoints().AppendEmpty() + dp.Attributes().PutStr("remote_name", "156955") + dp.Attributes().PutStr("url", "http://another_url") + dp.SetIntValue(245) + dp.SetTimestamp(1608124662.166 * 1e9) + } + + return metric, attributes +} + +func exampleDoubleGaugeMetric() (pmetric.Metric, pcommon.Map) { + return buildExampleDoubleGaugeMetric(true) +} + +func buildExampleDoubleGaugeMetric(fillData bool) (pmetric.Metric, pcommon.Map) { + attributes := pcommon.NewMap() + metric := pmetric.NewMetric() + + metric.SetEmptyGauge() + metric.SetName("gauge_metric_name_double_test") + + attributes.PutStr("foo", "bar") + + if fillData { + dp := metric.Gauge().DataPoints().AppendEmpty() + dp.Attributes().PutStr("local_name", "156720") + dp.Attributes().PutStr("endpoint", "http://example_url") + dp.SetDoubleValue(33.4) + dp.SetTimestamp(1608124661.169 * 1e9) + + dp = metric.Gauge().DataPoints().AppendEmpty() + dp.Attributes().PutStr("local_name", "156155") + dp.Attributes().PutStr("endpoint", "http://another_url") + dp.SetDoubleValue(56.8) + dp.SetTimestamp(1608124662.186 * 1e9) + } + + return metric, attributes +} + +func exampleIntSumMetric() (pmetric.Metric, pcommon.Map) { + return buildExampleIntSumMetric(true) +} + +func buildExampleIntSumMetric(fillData bool) (pmetric.Metric, pcommon.Map) { + attributes := pcommon.NewMap() + metric := pmetric.NewMetric() + + metric.SetEmptySum() + metric.SetName("sum_metric_int_test") + + attributes.PutStr("foo", "bar") + + if fillData { + dp := metric.Sum().DataPoints().AppendEmpty() + dp.Attributes().PutStr("name", "156720") + dp.Attributes().PutStr("address", "http://example_url") + dp.SetIntValue(45) + dp.SetTimestamp(1608124444.169 * 1e9) + + dp = metric.Sum().DataPoints().AppendEmpty() + dp.Attributes().PutStr("name", "156155") + dp.Attributes().PutStr("address", "http://another_url") + dp.SetIntValue(1238) + dp.SetTimestamp(1608124699.186 * 1e9) + } + + return metric, attributes +} + +func exampleDoubleSumMetric() (pmetric.Metric, pcommon.Map) { + return buildExampleDoubleSumMetric(true) +} + +func buildExampleDoubleSumMetric(fillData bool) (pmetric.Metric, pcommon.Map) { + attributes := pcommon.NewMap() + metric := pmetric.NewMetric() + + metric.SetEmptySum() + metric.SetName("sum_metric_double_test") + + attributes.PutStr("foo", "bar") + + if fillData { + dp := metric.Sum().DataPoints().AppendEmpty() + dp.Attributes().PutStr("pod_name", "lorem") + dp.Attributes().PutStr("namespace", "default") + dp.SetDoubleValue(45.6) + dp.SetTimestamp(1618124444.169 * 1e9) + + dp = metric.Sum().DataPoints().AppendEmpty() + dp.Attributes().PutStr("pod_name", "opsum") + dp.Attributes().PutStr("namespace", "kube-config") + dp.SetDoubleValue(1238.1) + dp.SetTimestamp(1608424699.186 * 1e9) + } + + return metric, attributes +} + +func exampleSummaryMetric() (pmetric.Metric, pcommon.Map) { + return buildExampleSummaryMetric(true) +} + +func buildExampleSummaryMetric(fillData bool) (pmetric.Metric, pcommon.Map) { + attributes := pcommon.NewMap() + metric := pmetric.NewMetric() + + metric.SetEmptySummary() + metric.SetName("summary_metric_double_test") + + attributes.PutStr("foo", "bar") + + if fillData { + dp := metric.Summary().DataPoints().AppendEmpty() + dp.Attributes().PutStr("pod_name", "dolor") + dp.Attributes().PutStr("namespace", "sumologic") + dp.SetSum(45.6) + dp.SetCount(3) + dp.SetTimestamp(1618124444.169 * 1e9) + + quantile := dp.QuantileValues().AppendEmpty() + quantile.SetQuantile(0.6) + quantile.SetValue(0.7) + + quantile = dp.QuantileValues().AppendEmpty() + quantile.SetQuantile(2.6) + quantile.SetValue(4) + + dp = metric.Summary().DataPoints().AppendEmpty() + dp.Attributes().PutStr("pod_name", "sit") + dp.Attributes().PutStr("namespace", "main") + dp.SetSum(1238.1) + dp.SetCount(7) + dp.SetTimestamp(1608424699.186 * 1e9) + } + + return metric, attributes +} + +func exampleHistogramMetric() (pmetric.Metric, pcommon.Map) { + return buildExampleHistogramMetric(true) +} + +func buildExampleHistogramMetric(fillData bool) (pmetric.Metric, pcommon.Map) { + attributes := pcommon.NewMap() + metric := pmetric.NewMetric() + + metric.SetEmptyHistogram() + metric.SetName("histogram_metric_double_test") + + attributes.PutStr("bar", "foo") + + if fillData { + dp := metric.Histogram().DataPoints().AppendEmpty() + dp.Attributes().PutStr("container", "dolor") + dp.Attributes().PutStr("branch", "sumologic") + si := pcommon.NewUInt64Slice() + si.FromRaw([]uint64{0, 12, 7, 5, 8, 13}) + si.CopyTo(dp.BucketCounts()) + + sf := pcommon.NewFloat64Slice() + sf.FromRaw([]float64{0.1, 0.2, 0.5, 0.8, 1}) + sf.CopyTo(dp.ExplicitBounds()) + + dp.SetTimestamp(1618124444.169 * 1e9) + dp.SetSum(45.6) + dp.SetCount(7) + + dp = metric.Histogram().DataPoints().AppendEmpty() + dp.Attributes().PutStr("container", "sit") + dp.Attributes().PutStr("branch", "main") + + si = pcommon.NewUInt64Slice() + si.FromRaw([]uint64{0, 10, 1, 1, 4, 6}) + si.CopyTo(dp.BucketCounts()) + + sf = pcommon.NewFloat64Slice() + sf.FromRaw([]float64{0.1, 0.2, 0.5, 0.8, 1}) + sf.CopyTo(dp.ExplicitBounds()) + + dp.SetTimestamp(1608424699.186 * 1e9) + dp.SetSum(54.1) + dp.SetCount(98) + } else { + dp := metric.Histogram().DataPoints().AppendEmpty() + dp.SetCount(0) + } + + return metric, attributes +} + +func metricPairToMetrics(mp ...metricPair) pmetric.Metrics { + metrics := pmetric.NewMetrics() + metrics.ResourceMetrics().EnsureCapacity(len(mp)) + for _, record := range mp { + rms := metrics.ResourceMetrics().AppendEmpty() + record.attributes.CopyTo(rms.Resource().Attributes()) + // TODO: Change metricPair to have an init metric func. + record.metric.CopyTo(rms.ScopeMetrics().AppendEmpty().Metrics().AppendEmpty()) + } + + metrics.MarkReadOnly() + return metrics +} + +func metricAndAttrsToPdataMetrics(attributes pcommon.Map, ms ...pmetric.Metric) pmetric.Metrics { + metrics := pmetric.NewMetrics() + metrics.ResourceMetrics().EnsureCapacity(len(ms)) + + rms := metrics.ResourceMetrics().AppendEmpty() + attributes.CopyTo(rms.Resource().Attributes()) + + metricsSlice := rms.ScopeMetrics().AppendEmpty().Metrics() + + for _, record := range ms { + record.CopyTo(metricsSlice.AppendEmpty()) + } + + return metrics +} + +func metricAndAttributesToPdataMetrics(metric pmetric.Metric, attributes pcommon.Map) pmetric.Metrics { + metrics := pmetric.NewMetrics() + metrics.ResourceMetrics().EnsureCapacity(attributes.Len()) + rms := metrics.ResourceMetrics().AppendEmpty() + attributes.CopyTo(rms.Resource().Attributes()) + metric.CopyTo(rms.ScopeMetrics().AppendEmpty().Metrics().AppendEmpty()) + + return metrics +} + +func fieldsFromMap(s map[string]string) fields { + attrMap := pcommon.NewMap() + for k, v := range s { + attrMap.PutStr(k, v) + } + return newFields(attrMap) +}