diff --git a/exporter/opsrampotlpexporter/cfg-schema.yaml b/exporter/opsrampotlpexporter/cfg-schema.yaml new file mode 100644 index 000000000000..03f62d2e6436 --- /dev/null +++ b/exporter/opsrampotlpexporter/cfg-schema.yaml @@ -0,0 +1,151 @@ +type: '*opsrampotlpexporter.Config' +fields: +- name: timeout + type: time.Duration + kind: int64 + default: 5s + doc: | + Timeout is the timeout for every attempt to send data to the backend. +- name: sending_queue + type: exporterhelper.QueueSettings + kind: struct + fields: + - name: enabled + kind: bool + default: true + doc: | + Enabled indicates whether to not enqueue batches before sending to the consumerSender. + - name: num_consumers + kind: int + default: 10 + doc: | + NumConsumers is the number of consumers from the queue. + - name: queue_size + kind: int + default: 5000 + doc: | + QueueSize is the maximum number of batches allowed in queue at a given time. +- name: retry_on_failure + type: exporterhelper.RetrySettings + kind: struct + fields: + - name: enabled + kind: bool + default: true + doc: | + Enabled indicates whether to not retry sending batches in case of export failure. + - name: initial_interval + type: time.Duration + kind: int64 + default: 5s + doc: | + InitialInterval the time to wait after the first failure before retrying. + - name: max_interval + type: time.Duration + kind: int64 + default: 30s + doc: | + MaxInterval is the upper bound on backoff interval. Once this value is reached the delay between + consecutive retries will always be `MaxInterval`. + - name: max_elapsed_time + type: time.Duration + kind: int64 + default: 5m0s + doc: | + MaxElapsedTime is the maximum amount of time (including retries) spent trying to send a request/batch. + Once this value is reached, the data is discarded. +- name: endpoint + kind: string + doc: | + The target to which the exporter is going to send traces or metrics, + using the gRPC protocol. The valid syntax is described at + https://github.com/grpc/grpc/blob/master/doc/naming.md. +- name: compression + kind: string + doc: | + The compression key for supported compression types within + collector. Supports `gzip`, `snappy` and `zstd`. +- name: ca_file + kind: string + doc: | + Path to the CA cert. For a client this verifies the server certificate. + For a server this verifies client certificates. If empty uses system root CA. + (optional) +- name: cert_file + kind: string + doc: | + Path to the TLS cert to use for TLS required connections. (optional) +- name: key_file + kind: string + doc: | + Path to the TLS key to use for TLS required connections. (optional) +- name: insecure + kind: bool + doc: | + In gRPC when set to true, this is used to disable the client transport security. + See https://godoc.org/google.golang.org/grpc#WithInsecure. + In HTTP, this disables verifying the server's certificate chain and host name + (InsecureSkipVerify in the tls Config). Please refer to + https://godoc.org/crypto/tls#Config for more information. + (optional, default false) +- name: server_name_override + kind: string + doc: | + ServerName requested by client for virtual hosting. + This sets the ServerName in the TLSConfig. Please refer to + https://godoc.org/crypto/tls#Config for more information. (optional) +- name: keepalive + type: '*configgrpc.KeepaliveClientConfig' + kind: ptr + doc: | + The keepalive parameters for gRPC client. See grpc.WithKeepaliveParams + (https://godoc.org/google.golang.org/grpc#WithKeepaliveParams). + fields: + - name: time + type: time.Duration + kind: int64 + - name: timeout + type: time.Duration + kind: int64 + - name: permit_without_stream + kind: bool +- name: read_buffer_size + kind: int + doc: | + ReadBufferSize for gRPC client. See grpc.WithReadBufferSize + (https://godoc.org/google.golang.org/grpc#WithReadBufferSize). +- name: write_buffer_size + kind: int + default: 524288 + doc: | + WriteBufferSize for gRPC gRPC. See grpc.WithWriteBufferSize + (https://godoc.org/google.golang.org/grpc#WithWriteBufferSize). +- name: wait_for_ready + kind: bool + doc: | + WaitForReady parameter configures client to wait for ready state before sending data. + (https://github.com/grpc/grpc/blob/master/doc/wait-for-ready.md) +- name: headers + type: map[string]string + kind: map + doc: | + The headers associated with gRPC requests. +- name: per_rpc_auth + type: '*configgrpc.PerRPCAuthConfig' + kind: ptr + doc: | + PerRPCAuth parameter configures the client to send authentication data on a per-RPC basis. + fields: + - name: type + kind: string + doc: | + AuthType represents the authentication type to use. Currently, only 'bearer' is supported. + - name: bearer_token + kind: string + doc: | + BearerToken specifies the bearer token to use for every RPC. +- name: balancer_name + kind: string + doc: | + Sets the balancer in grpclb_policy to discover the servers. Default is pick_first + https://github.com/grpc/grpc-go/blob/master/examples/features/load_balancing/README.md diff --git a/exporter/opsrampotlpexporter/config.go b/exporter/opsrampotlpexporter/config.go new file mode 100644 index 000000000000..fb936168bf92 --- /dev/null +++ b/exporter/opsrampotlpexporter/config.go @@ -0,0 +1,87 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package opsrampotlpexporter // import "go.opentelemetry.io/collector/exporter/otlpexporter" + +import ( + "errors" + "fmt" + + "go.opentelemetry.io/collector/config" + "go.opentelemetry.io/collector/config/configgrpc" + "go.opentelemetry.io/collector/exporter/exporterhelper" +) + +const grantType = "client_credentials" + +type MaskingSettings struct { + Regexp string `mapstructure:"regexp"` + Placeholder string `mapstructure:"placeholder"` +} + +type SecuritySettings struct { + OAuthServiceURL string `mapstructure:"oauth_service_url"` + ClientId string `mapstructure:"client_id"` + ClientSecret string `mapstructure:"client_secret"` +} + +type Credentials struct { + AccessToken string `json:"access_token"` + TokenType string `json:"token_type"` + ExpiresIn string `json:"expired_in"` + Scope string `json:"scope"` +} + +func (s *SecuritySettings) Validate() error { + if len(s.OAuthServiceURL) == 0 { + return errors.New("oauth service url missed") + } + + if len(s.ClientId) == 0 { + return errors.New("client_id missed") + } + + if len(s.ClientSecret) == 0 { + return errors.New("client_secret missed") + } + + return nil +} + +// Config defines configuration for OpenCensus exporter. +type Config struct { + config.ExporterSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct + exporterhelper.TimeoutSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct. + exporterhelper.QueueSettings `mapstructure:"sending_queue"` + exporterhelper.RetrySettings `mapstructure:"retry_on_failure"` + + Security SecuritySettings `mapstructure:"security"` + configgrpc.GRPCClientSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct. + Masking []MaskingSettings `mapstructure:"masking"` +} + +var _ config.Exporter = (*Config)(nil) + +// Validate checks if the exporter configuration is valid +func (cfg *Config) Validate() error { + if err := cfg.QueueSettings.Validate(); err != nil { + return fmt.Errorf("queue settings has invalid configuration: %w", err) + } + + if err := cfg.Security.Validate(); err != nil { + return fmt.Errorf("security settings has invalid configuration: %w", err) + } + + return nil +} diff --git a/exporter/opsrampotlpexporter/config_test.go b/exporter/opsrampotlpexporter/config_test.go new file mode 100644 index 000000000000..e76625993ba1 --- /dev/null +++ b/exporter/opsrampotlpexporter/config_test.go @@ -0,0 +1,95 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package opsrampotlpexporter + +import ( + "path/filepath" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/config" + "go.opentelemetry.io/collector/config/configauth" + "go.opentelemetry.io/collector/config/configgrpc" + "go.opentelemetry.io/collector/config/configtls" + "go.opentelemetry.io/collector/exporter/exporterhelper" + "go.opentelemetry.io/collector/service/servicetest" +) + +func TestLoadConfig(t *testing.T) { + factories, err := componenttest.NopFactories() + assert.NoError(t, err) + + factory := NewFactory() + factories.Exporters[typeStr] = factory + + cfg, err := servicetest.LoadConfigAndValidate(filepath.Join("testdata", "config.yaml"), factories) + + require.NoError(t, err) + require.NotNil(t, cfg) + + e := cfg.Exporters[config.NewComponentID(typeStr)] + + //e1 := cfg.Exporters[config.NewComponentIDWithName(typeStr, "2")] + assert.Equal(t, e, + &Config{ + ExporterSettings: config.NewExporterSettings(config.NewComponentIDWithName(typeStr, "")), + TimeoutSettings: exporterhelper.TimeoutSettings{ + Timeout: 10 * time.Second, + }, + RetrySettings: exporterhelper.RetrySettings{ + Enabled: true, + InitialInterval: 10 * time.Second, + MaxInterval: 1 * time.Minute, + MaxElapsedTime: 10 * time.Minute, + }, + QueueSettings: exporterhelper.QueueSettings{ + Enabled: true, + NumConsumers: 2, + QueueSize: 10, + }, + Security: SecuritySettings{ + ClientId: "id", + ClientSecret: "secret", + OAuthServiceURL: "url", + }, + GRPCClientSettings: configgrpc.GRPCClientSettings{ + Headers: map[string]string{ + "can you have a . here?": "F0000000-0000-0000-0000-000000000000", + "header1": "234", + "another": "somevalue", + }, + Endpoint: "1.2.3.4:1234", + Compression: "gzip", + TLSSetting: configtls.TLSClientSetting{ + TLSSetting: configtls.TLSSetting{ + CAFile: "/var/lib/mycert.pem", + }, + Insecure: false, + }, + Keepalive: &configgrpc.KeepaliveClientConfig{ + Time: 20 * time.Second, + PermitWithoutStream: true, + Timeout: 30 * time.Second, + }, + WriteBufferSize: 512 * 1024, + BalancerName: "round_robin", + Auth: &configauth.Authentication{AuthenticatorID: config.NewComponentID("nop")}, + }, + }) +} diff --git a/exporter/opsrampotlpexporter/doc.go b/exporter/opsrampotlpexporter/doc.go new file mode 100644 index 000000000000..d32e80039d53 --- /dev/null +++ b/exporter/opsrampotlpexporter/doc.go @@ -0,0 +1,16 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package otlpexporter exports data by using the OTLP format to a gPRC endpoint. +package opsrampotlpexporter // import "go.opentelemetry.io/collector/exporter/otlpexporter" diff --git a/exporter/opsrampotlpexporter/factory.go b/exporter/opsrampotlpexporter/factory.go new file mode 100644 index 000000000000..732650ac53dc --- /dev/null +++ b/exporter/opsrampotlpexporter/factory.go @@ -0,0 +1,125 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package opsrampotlpexporter // import "go.opentelemetry.io/collector/exporter/otlpexporter" + +import ( + "context" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config" + "go.opentelemetry.io/collector/config/configcompression" + "go.opentelemetry.io/collector/config/configgrpc" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/exporter/exporterhelper" +) + +const ( + // The value of "type" key in configuration. + typeStr = "opsrampotlp" +) + +// NewFactory creates a factory for OTLP exporter. +func NewFactory() component.ExporterFactory { + return component.NewExporterFactory( + typeStr, + createDefaultConfig, + component.WithTracesExporter(createTracesExporter), + component.WithMetricsExporter(createMetricsExporter), + component.WithLogsExporter(createLogsExporter)) +} + +func createDefaultConfig() config.Exporter { + return &Config{ + ExporterSettings: config.NewExporterSettings(config.NewComponentID(typeStr)), + TimeoutSettings: exporterhelper.NewDefaultTimeoutSettings(), + RetrySettings: exporterhelper.NewDefaultRetrySettings(), + QueueSettings: exporterhelper.NewDefaultQueueSettings(), + GRPCClientSettings: configgrpc.GRPCClientSettings{ + Headers: map[string]string{}, + // Default to gzip compression + Compression: configcompression.Gzip, + // We almost read 0 bytes, so no need to tune ReadBufferSize. + WriteBufferSize: 512 * 1024, + }, + } +} + +func createTracesExporter( + _ context.Context, + set component.ExporterCreateSettings, + cfg config.Exporter, +) (component.TracesExporter, error) { + oce, err := newExporter(cfg, set) + if err != nil { + return nil, err + } + oCfg := cfg.(*Config) + return exporterhelper.NewTracesExporter( + cfg, + set, + oce.pushTraces, + exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}), + exporterhelper.WithTimeout(oCfg.TimeoutSettings), + exporterhelper.WithRetry(oCfg.RetrySettings), + exporterhelper.WithQueue(oCfg.QueueSettings), + exporterhelper.WithStart(oce.start), + exporterhelper.WithShutdown(oce.shutdown)) +} + +func createMetricsExporter( + _ context.Context, + set component.ExporterCreateSettings, + cfg config.Exporter, +) (component.MetricsExporter, error) { + oce, err := newExporter(cfg, set) + if err != nil { + return nil, err + } + oCfg := cfg.(*Config) + return exporterhelper.NewMetricsExporter( + cfg, + set, + oce.pushMetrics, + exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}), + exporterhelper.WithTimeout(oCfg.TimeoutSettings), + exporterhelper.WithRetry(oCfg.RetrySettings), + exporterhelper.WithQueue(oCfg.QueueSettings), + exporterhelper.WithStart(oce.start), + exporterhelper.WithShutdown(oce.shutdown), + ) +} + +func createLogsExporter( + _ context.Context, + set component.ExporterCreateSettings, + cfg config.Exporter, +) (component.LogsExporter, error) { + oce, err := newExporter(cfg, set) + if err != nil { + return nil, err + } + oCfg := cfg.(*Config) + return exporterhelper.NewLogsExporter( + cfg, + set, + oce.pushLogs, + exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}), + exporterhelper.WithTimeout(oCfg.TimeoutSettings), + exporterhelper.WithRetry(oCfg.RetrySettings), + exporterhelper.WithQueue(oCfg.QueueSettings), + exporterhelper.WithStart(oce.start), + exporterhelper.WithShutdown(oce.shutdown), + ) +} diff --git a/exporter/opsrampotlpexporter/factory_test.go b/exporter/opsrampotlpexporter/factory_test.go new file mode 100644 index 000000000000..8e06d014ff25 --- /dev/null +++ b/exporter/opsrampotlpexporter/factory_test.go @@ -0,0 +1,236 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package opsrampotlpexporter + +import ( + "context" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/testutil" + "path/filepath" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/config" + "go.opentelemetry.io/collector/config/configcompression" + "go.opentelemetry.io/collector/config/configgrpc" + "go.opentelemetry.io/collector/config/configtest" + "go.opentelemetry.io/collector/config/configtls" + "go.opentelemetry.io/collector/exporter/exporterhelper" +) + +func TestCreateDefaultConfig(t *testing.T) { + factory := NewFactory() + cfg := factory.CreateDefaultConfig() + assert.NotNil(t, cfg, "failed to create default config") + assert.NoError(t, configtest.CheckConfigStruct(cfg)) + ocfg, ok := factory.CreateDefaultConfig().(*Config) + assert.True(t, ok) + assert.Equal(t, ocfg.RetrySettings, exporterhelper.NewDefaultRetrySettings()) + assert.Equal(t, ocfg.QueueSettings, exporterhelper.NewDefaultQueueSettings()) + assert.Equal(t, ocfg.TimeoutSettings, exporterhelper.NewDefaultTimeoutSettings()) + assert.Equal(t, ocfg.Compression, configcompression.Gzip) +} + +func TestCreateMetricsExporter(t *testing.T) { + factory := NewFactory() + cfg := factory.CreateDefaultConfig().(*Config) + cfg.GRPCClientSettings.Endpoint = testutil.GetAvailableLocalAddress(t) + cfg.Security = SecuritySettings{ + OAuthServiceURL: "https://asura.opsramp.net/auth/oauth/token?agent=true", + ClientId: "mamRxRJB796HYtWYxqeDzeEXCKSswnsr", + ClientSecret: "Da2achZqvHF7tKDaSP3FCkHE2PKcY6twRxwZEnEYQHc5GADgHy5VZDBxdeKhNbrw", + } + + set := componenttest.NewNopExporterCreateSettings() + oexp, err := factory.CreateMetricsExporter(context.Background(), set, cfg) + require.Nil(t, err) + require.NotNil(t, oexp) +} + +func TestCreateTracesExporter(t *testing.T) { + endpoint := testutil.GetAvailableLocalAddress(t) + tests := []struct { + name string + config Config + mustFailOnCreate bool + mustFailOnStart bool + }{ + { + name: "NoEndpoint", + config: Config{ + ExporterSettings: config.NewExporterSettings(config.NewComponentID(typeStr)), + GRPCClientSettings: configgrpc.GRPCClientSettings{ + Endpoint: "", + }, + }, + mustFailOnCreate: true, + }, + { + name: "UseSecure", + config: Config{ + ExporterSettings: config.NewExporterSettings(config.NewComponentID(typeStr)), + GRPCClientSettings: configgrpc.GRPCClientSettings{ + Endpoint: endpoint, + TLSSetting: configtls.TLSClientSetting{ + Insecure: false, + }, + }, + }, + }, + { + name: "Keepalive", + config: Config{ + ExporterSettings: config.NewExporterSettings(config.NewComponentID(typeStr)), + GRPCClientSettings: configgrpc.GRPCClientSettings{ + Endpoint: endpoint, + Keepalive: &configgrpc.KeepaliveClientConfig{ + Time: 30 * time.Second, + Timeout: 25 * time.Second, + PermitWithoutStream: true, + }, + }, + }, + }, + { + name: "NoneCompression", + config: Config{ + ExporterSettings: config.NewExporterSettings(config.NewComponentID(typeStr)), + GRPCClientSettings: configgrpc.GRPCClientSettings{ + Endpoint: endpoint, + Compression: "none", + }, + }, + }, + { + name: "GzipCompression", + config: Config{ + ExporterSettings: config.NewExporterSettings(config.NewComponentID(typeStr)), + GRPCClientSettings: configgrpc.GRPCClientSettings{ + Endpoint: endpoint, + Compression: configcompression.Gzip, + }, + }, + }, + { + name: "SnappyCompression", + config: Config{ + GRPCClientSettings: configgrpc.GRPCClientSettings{ + Endpoint: endpoint, + Compression: configcompression.Snappy, + }, + }, + }, + { + name: "ZstdCompression", + config: Config{ + GRPCClientSettings: configgrpc.GRPCClientSettings{ + Endpoint: endpoint, + Compression: configcompression.Zstd, + }, + }, + }, + { + name: "Headers", + config: Config{ + ExporterSettings: config.NewExporterSettings(config.NewComponentID(typeStr)), + GRPCClientSettings: configgrpc.GRPCClientSettings{ + Endpoint: endpoint, + Headers: map[string]string{ + "hdr1": "val1", + "hdr2": "val2", + }, + }, + }, + }, + { + name: "NumConsumers", + config: Config{ + ExporterSettings: config.NewExporterSettings(config.NewComponentID(typeStr)), + GRPCClientSettings: configgrpc.GRPCClientSettings{ + Endpoint: endpoint, + }, + }, + }, + { + name: "CaCert", + config: Config{ + ExporterSettings: config.NewExporterSettings(config.NewComponentID(typeStr)), + GRPCClientSettings: configgrpc.GRPCClientSettings{ + Endpoint: endpoint, + TLSSetting: configtls.TLSClientSetting{ + TLSSetting: configtls.TLSSetting{ + CAFile: filepath.Join("testdata", "test_cert.pem"), + }, + }, + }, + }, + }, + { + name: "CertPemFileError", + config: Config{ + ExporterSettings: config.NewExporterSettings(config.NewComponentID(typeStr)), + GRPCClientSettings: configgrpc.GRPCClientSettings{ + Endpoint: endpoint, + TLSSetting: configtls.TLSClientSetting{ + TLSSetting: configtls.TLSSetting{ + CAFile: "nosuchfile", + }, + }, + }, + }, + mustFailOnStart: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + factory := NewFactory() + set := componenttest.NewNopExporterCreateSettings() + consumer, err := factory.CreateTracesExporter(context.Background(), set, &tt.config) + if tt.mustFailOnCreate { + assert.NotNil(t, err) + return + } + assert.NoError(t, err) + assert.NotNil(t, consumer) + err = consumer.Start(context.Background(), componenttest.NewNopHost()) + if tt.mustFailOnStart { + assert.Error(t, err) + return + } + assert.NoError(t, err) + err = consumer.Shutdown(context.Background()) + if err != nil { + // Since the endpoint of OTLP exporter doesn't actually exist, + // exporter may already stop because it cannot connect. + assert.Equal(t, err.Error(), "rpc error: code = Canceled desc = grpc: the client connection is closing") + } + }) + } +} + +func TestCreateLogsExporter(t *testing.T) { + factory := NewFactory() + cfg := factory.CreateDefaultConfig().(*Config) + cfg.GRPCClientSettings.Endpoint = testutil.GetAvailableLocalAddress(t) + + set := componenttest.NewNopExporterCreateSettings() + oexp, err := factory.CreateLogsExporter(context.Background(), set, cfg) + require.Nil(t, err) + require.NotNil(t, oexp) +} diff --git a/exporter/opsrampotlpexporter/otlp.go b/exporter/opsrampotlpexporter/otlp.go new file mode 100644 index 000000000000..bff3913838eb --- /dev/null +++ b/exporter/opsrampotlpexporter/otlp.go @@ -0,0 +1,308 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package opsrampotlpexporter // import "go.opentelemetry.io/collector/exporter/otlpexporter" + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + "io/ioutil" + "net/http" + "net/url" + "regexp" + "runtime" + "time" + + "go.opentelemetry.io/collector/config" + + "google.golang.org/genproto/googleapis/rpc/errdetails" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/metadata" + "google.golang.org/grpc/status" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/consumer/consumererror" + "go.opentelemetry.io/collector/exporter/exporterhelper" + "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/plog/plogotlp" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp" + "go.opentelemetry.io/collector/pdata/ptrace" + "go.opentelemetry.io/collector/pdata/ptrace/ptraceotlp" +) + +type exporter struct { + // Input configuration. + config *Config + + // gRPC clients and connection. + traceExporter ptraceotlp.Client + metricExporter pmetricotlp.Client + logExporter plogotlp.Client + clientConn *grpc.ClientConn + metadata metadata.MD + callOptions []grpc.CallOption + + settings component.TelemetrySettings + + // Default user-agent header. + userAgent string + accessToken string +} + +// Crete new exporter and start it. The exporter will begin connecting but +// this function may return before the connection is established. +func newExporter(cfg config.Exporter, set component.ExporterCreateSettings) (*exporter, error) { + oCfg := cfg.(*Config) + + accessToken, err := getAuthToken(oCfg.Security) + if err != nil { + return nil, fmt.Errorf("access token isn't available: %w", err) + } + + if oCfg.Endpoint == "" { + return nil, errors.New("OTLP exporter config requires an Endpoint") + } + + userAgent := fmt.Sprintf("%s/%s (%s/%s)", + set.BuildInfo.Description, set.BuildInfo.Version, runtime.GOOS, runtime.GOARCH) + + return &exporter{config: oCfg, settings: set.TelemetrySettings, userAgent: userAgent, accessToken: accessToken}, nil +} + +type Creds struct { + ClientID string `json:"client_id"` + ClientSecret string `json:"client_secret"` + GrantType string `json:"grant_type"` +} + +type Person struct { + Name string `json:"name"` + Age int `json:"age"` +} + +func getAuthToken(cfg SecuritySettings) (string, error) { + + client := &http.Client{} + data := url.Values{} + data.Set("client_id", cfg.ClientId) + data.Set("client_secret", cfg.ClientSecret) + data.Set("grant_type", grantType) + + request, err := http.NewRequest("POST", cfg.OAuthServiceURL, bytes.NewBufferString(data.Encode())) + if err != nil { + return "", err + } + request.Header.Set("Accept", "application/json") + request.Header.Set("Content-Type", "application/x-www-form-urlencoded") + resp, err := client.Do(request) + + if err != nil { + return "", err + } + defer resp.Body.Close() + jsonResp, err := ioutil.ReadAll(resp.Body) + if err != nil { + return "", err + } + + var credentials Credentials + if err := json.Unmarshal(jsonResp, &credentials); err != nil { + return "", err + } + return credentials.AccessToken, nil + +} + +// start actually creates the gRPC connection. The client construction is deferred till this point as this +// is the only place we get hold of Extensions which are required to construct auth round tripper. +func (e *exporter) start(ctx context.Context, host component.Host) (err error) { + dialOpts, err := e.config.GRPCClientSettings.ToDialOptions(host, e.settings) + if err != nil { + return err + } + dialOpts = append(dialOpts, grpc.WithUserAgent(e.userAgent)) + + if e.clientConn, err = grpc.DialContext(ctx, e.config.GRPCClientSettings.SanitizedEndpoint(), dialOpts...); err != nil { + return err + } + + e.traceExporter = ptraceotlp.NewClient(e.clientConn) + e.metricExporter = pmetricotlp.NewClient(e.clientConn) + e.logExporter = plogotlp.NewClient(e.clientConn) + e.metadata = metadata.New(e.config.GRPCClientSettings.Headers) + e.metadata.Set("Authorization", fmt.Sprintf("Bearer %s", e.accessToken)) + e.callOptions = []grpc.CallOption{ + grpc.WaitForReady(e.config.GRPCClientSettings.WaitForReady), + } + + return +} + +func (e *exporter) shutdown(context.Context) error { + return e.clientConn.Close() +} + +func (e *exporter) pushTraces(ctx context.Context, td ptrace.Traces) error { + req := ptraceotlp.NewRequestFromTraces(td) + _, err := e.traceExporter.Export(e.enhanceContext(ctx), req, e.callOptions...) + return processError(err) +} + +func (e *exporter) pushMetrics(ctx context.Context, md pmetric.Metrics) error { + req := pmetricotlp.NewRequestFromMetrics(md) + _, err := e.metricExporter.Export(e.enhanceContext(ctx), req, e.callOptions...) + return processError(err) +} + +func (e *exporter) pushLogs(ctx context.Context, ld plog.Logs) error { + if e.config.Masking != nil { + e.applyMasking(ld) + } + + req := plogotlp.NewRequestFromLogs(ld) + + _, err := e.logExporter.Export(e.enhanceContext(ctx), req, e.callOptions...) + + // trying to get new access token in case of expiration + if err != nil { + st := status.Convert(err) + if st.Code() == codes.Unauthenticated { + if err := e.updateExpiredToken(); err != nil { + return fmt.Errorf("couldn't retreive new token instead of expired: %w", err) + } + + _, err = e.logExporter.Export(e.enhanceContext(ctx), req, e.callOptions...) + if err != nil { + return err + } + } + return processError(err) + } + return nil +} + +func (e *exporter) updateExpiredToken() error { + accessToken, err := getAuthToken(e.config.Security) + if err != nil { + return err + } + e.metadata.Set("bearer", accessToken) + return nil +} + +func (e *exporter) enhanceContext(ctx context.Context) context.Context { + if e.metadata.Len() > 0 { + return metadata.NewOutgoingContext(ctx, e.metadata) + } + return ctx +} + +// Send a trace or metrics request to the server. "perform" function is expected to make +// the actual gRPC unary call that sends the request. This function implements the +// common OTLP logic around request handling such as retries and throttling. +func processError(err error) error { + if err == nil { + // Request is successful, we are done. + return nil + } + + // We have an error, check gRPC status code. + + st := status.Convert(err) + if st.Code() == codes.OK { + // Not really an error, still success. + return nil + } + + // Now, this is this a real error. + + retryInfo := getRetryInfo(st) + + if !shouldRetry(st.Code(), retryInfo) { + // It is not a retryable error, we should not retry. + return consumererror.NewPermanent(err) + } + + // Check if server returned throttling information. + throttleDuration := getThrottleDuration(retryInfo) + if throttleDuration != 0 { + // We are throttled. Wait before retrying as requested by the server. + return exporterhelper.NewThrottleRetry(err, throttleDuration) + } + + // Need to retry. + + return err +} + +func shouldRetry(code codes.Code, retryInfo *errdetails.RetryInfo) bool { + switch code { + case codes.Canceled, + codes.DeadlineExceeded, + codes.Aborted, + codes.OutOfRange, + codes.Unavailable, + codes.DataLoss: + // These are retryable errors. + return true + case codes.ResourceExhausted: + // Retry only if RetryInfo was supplied by the server. + // This indicates that the server can still recover from resource exhaustion. + return retryInfo != nil + } + // Don't retry on any other code. + return false +} + +func getRetryInfo(status *status.Status) *errdetails.RetryInfo { + for _, detail := range status.Details() { + if t, ok := detail.(*errdetails.RetryInfo); ok { + return t + } + } + return nil +} + +func getThrottleDuration(t *errdetails.RetryInfo) time.Duration { + if t == nil || t.RetryDelay == nil { + return 0 + } + if t.RetryDelay.Seconds > 0 || t.RetryDelay.Nanos > 0 { + return time.Duration(t.RetryDelay.Seconds)*time.Second + time.Duration(t.RetryDelay.Nanos)*time.Nanosecond + } + return 0 +} + +func (e *exporter) applyMasking(ld plog.Logs) { + + for i := 0; i < ld.ResourceLogs().Len(); i++ { + resLogs := ld.ResourceLogs().At(i) + for k := 0; k < resLogs.ScopeLogs().Len(); k++ { + scopedLog := resLogs.ScopeLogs().At(k) + for z := 0; z < scopedLog.LogRecords().Len(); z++ { + log := scopedLog.LogRecords().At(z) + for _, setting := range e.config.Masking { + regexp := regexp.MustCompile(setting.Regexp) + log.Body().SetStringVal(regexp.ReplaceAllString(log.Body().AsString(), setting.Placeholder)) + } + } + } + } + +} diff --git a/exporter/opsrampotlpexporter/otlp_test.go b/exporter/opsrampotlpexporter/otlp_test.go new file mode 100644 index 000000000000..f3f3ab6a7f7b --- /dev/null +++ b/exporter/opsrampotlpexporter/otlp_test.go @@ -0,0 +1,729 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package opsrampotlpexporter + +import ( + "context" + "fmt" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/testdata" + "net" + "path/filepath" + "regexp" + "runtime" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/atomic" + "google.golang.org/genproto/googleapis/rpc/errdetails" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/credentials" + "google.golang.org/grpc/metadata" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/types/known/durationpb" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/config/configgrpc" + "go.opentelemetry.io/collector/config/configtls" + "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/plog/plogotlp" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp" + "go.opentelemetry.io/collector/pdata/ptrace" + "go.opentelemetry.io/collector/pdata/ptrace/ptraceotlp" +) + +type mockReceiver struct { + srv *grpc.Server + requestCount *atomic.Int32 + totalItems *atomic.Int32 + mux sync.Mutex + metadata metadata.MD +} + +func (r *mockReceiver) GetMetadata() metadata.MD { + r.mux.Lock() + defer r.mux.Unlock() + return r.metadata +} + +type mockTracesReceiver struct { + mockReceiver + exportError error + lastRequest ptrace.Traces +} + +func (r *mockTracesReceiver) Export(ctx context.Context, req ptraceotlp.Request) (ptraceotlp.Response, error) { + r.requestCount.Inc() + td := req.Traces() + r.totalItems.Add(int32(td.SpanCount())) + r.mux.Lock() + defer r.mux.Unlock() + r.lastRequest = td + r.metadata, _ = metadata.FromIncomingContext(ctx) + return ptraceotlp.NewResponse(), r.exportError +} + +func (r *mockTracesReceiver) GetLastRequest() ptrace.Traces { + r.mux.Lock() + defer r.mux.Unlock() + return r.lastRequest +} + +func otlpTracesReceiverOnGRPCServer(ln net.Listener, useTLS bool) (*mockTracesReceiver, error) { + sopts := []grpc.ServerOption{} + + if useTLS { + _, currentFile, _, _ := runtime.Caller(0) + basepath := filepath.Dir(currentFile) + certpath := filepath.Join(basepath, filepath.Join("testdata", "test_cert.pem")) + keypath := filepath.Join(basepath, filepath.Join("testdata", "test_key.pem")) + + creds, err := credentials.NewServerTLSFromFile(certpath, keypath) + if err != nil { + return nil, err + } + sopts = append(sopts, grpc.Creds(creds)) + } + + rcv := &mockTracesReceiver{ + mockReceiver: mockReceiver{ + srv: grpc.NewServer(sopts...), + requestCount: atomic.NewInt32(0), + totalItems: atomic.NewInt32(0), + }, + } + + // Now run it as a gRPC server + ptraceotlp.RegisterServer(rcv.srv, rcv) + go func() { + _ = rcv.srv.Serve(ln) + }() + + return rcv, nil +} + +type mockLogsReceiver struct { + mockReceiver + lastRequest plog.Logs +} + +func (r *mockLogsReceiver) Export(ctx context.Context, req plogotlp.Request) (plogotlp.Response, error) { + r.requestCount.Inc() + ld := req.Logs() + r.totalItems.Add(int32(ld.LogRecordCount())) + r.mux.Lock() + defer r.mux.Unlock() + r.lastRequest = ld + r.metadata, _ = metadata.FromIncomingContext(ctx) + return plogotlp.NewResponse(), nil +} + +func (r *mockLogsReceiver) GetLastRequest() plog.Logs { + r.mux.Lock() + defer r.mux.Unlock() + return r.lastRequest +} + +func otlpLogsReceiverOnGRPCServer(ln net.Listener) *mockLogsReceiver { + rcv := &mockLogsReceiver{ + mockReceiver: mockReceiver{ + srv: grpc.NewServer(), + requestCount: atomic.NewInt32(0), + totalItems: atomic.NewInt32(0), + }, + } + + // Now run it as a gRPC server + plogotlp.RegisterServer(rcv.srv, rcv) + go func() { + _ = rcv.srv.Serve(ln) + }() + + return rcv +} + +type mockMetricsReceiver struct { + mockReceiver + lastRequest pmetric.Metrics +} + +func (r *mockMetricsReceiver) Export(ctx context.Context, req pmetricotlp.Request) (pmetricotlp.Response, error) { + md := req.Metrics() + r.requestCount.Inc() + r.totalItems.Add(int32(md.DataPointCount())) + r.mux.Lock() + defer r.mux.Unlock() + r.lastRequest = md + r.metadata, _ = metadata.FromIncomingContext(ctx) + return pmetricotlp.NewResponse(), nil +} + +func (r *mockMetricsReceiver) GetLastRequest() pmetric.Metrics { + r.mux.Lock() + defer r.mux.Unlock() + return r.lastRequest +} + +func otlpMetricsReceiverOnGRPCServer(ln net.Listener) *mockMetricsReceiver { + rcv := &mockMetricsReceiver{ + mockReceiver: mockReceiver{ + srv: grpc.NewServer(), + requestCount: atomic.NewInt32(0), + totalItems: atomic.NewInt32(0), + }, + } + + // Now run it as a gRPC server + pmetricotlp.RegisterServer(rcv.srv, rcv) + go func() { + _ = rcv.srv.Serve(ln) + }() + + return rcv +} + +func TestSendTraces(t *testing.T) { + // Start an OTLP-compatible receiver. + ln, err := net.Listen("tcp", "localhost:") + require.NoError(t, err, "Failed to find an available address to run the gRPC server: %v", err) + rcv, _ := otlpTracesReceiverOnGRPCServer(ln, false) + // Also closes the connection. + defer rcv.srv.GracefulStop() + + // Start an OTLP exporter and point to the receiver. + factory := NewFactory() + cfg := factory.CreateDefaultConfig().(*Config) + cfg.GRPCClientSettings = configgrpc.GRPCClientSettings{ + Endpoint: ln.Addr().String(), + TLSSetting: configtls.TLSClientSetting{ + Insecure: true, + }, + Headers: map[string]string{ + "header": "header-value", + }, + } + cfg.Security = SecuritySettings{ + OAuthServiceURL: "https://asura.opsramp.net/auth/oauth/token?agent=true", + ClientId: "mamRxRJB796HYtWYxqeDzeEXCKSswnsr", + ClientSecret: "Da2achZqvHF7tKDaSP3FCkHE2PKcY6twRxwZEnEYQHc5GADgHy5VZDBxdeKhNbrw", + } + set := componenttest.NewNopExporterCreateSettings() + set.BuildInfo.Description = "Collector" + set.BuildInfo.Version = "1.2.3test" + exp, err := factory.CreateTracesExporter(context.Background(), set, cfg) + require.NoError(t, err) + require.NotNil(t, exp) + + defer func() { + assert.NoError(t, exp.Shutdown(context.Background())) + }() + + host := componenttest.NewNopHost() + assert.NoError(t, exp.Start(context.Background(), host)) + + // Ensure that initially there is no data in the receiver. + assert.EqualValues(t, 0, rcv.requestCount.Load()) + + // Send empty trace. + td := ptrace.NewTraces() + assert.NoError(t, exp.ConsumeTraces(context.Background(), td)) + + // Wait until it is received. + assert.Eventually(t, func() bool { + return rcv.requestCount.Load() > 0 + }, 10*time.Second, 5*time.Millisecond) + + // Ensure it was received empty. + assert.EqualValues(t, 0, rcv.totalItems.Load()) + + // A trace with 2 spans. + td = testdata.GenerateTracesTwoSpansSameResource() + + err = exp.ConsumeTraces(context.Background(), td) + assert.NoError(t, err) + + // Wait until it is received. + assert.Eventually(t, func() bool { + return rcv.requestCount.Load() > 1 + }, 10*time.Second, 5*time.Millisecond) + + expectedHeader := []string{"header-value"} + + // Verify received span. + assert.EqualValues(t, 2, rcv.totalItems.Load()) + assert.EqualValues(t, 2, rcv.requestCount.Load()) + assert.EqualValues(t, td, rcv.GetLastRequest()) + + md := rcv.GetMetadata() + require.EqualValues(t, md.Get("header"), expectedHeader) + require.Equal(t, len(md.Get("User-Agent")), 1) + require.Contains(t, md.Get("User-Agent")[0], "Collector/1.2.3test") +} + +func TestSendTracesWhenEndpointHasHttpScheme(t *testing.T) { + tests := []struct { + name string + useTLS bool + scheme string + gRPCClientSettings configgrpc.GRPCClientSettings + }{ + { + name: "Use https scheme", + useTLS: true, + scheme: "https://", + gRPCClientSettings: configgrpc.GRPCClientSettings{}, + }, + { + name: "Use http scheme", + useTLS: false, + scheme: "http://", + gRPCClientSettings: configgrpc.GRPCClientSettings{ + TLSSetting: configtls.TLSClientSetting{ + Insecure: true, + }, + }, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + // Start an OTLP-compatible receiver. + ln, err := net.Listen("tcp", "localhost:") + require.NoError(t, err, "Failed to find an available address to run the gRPC server: %v", err) + rcv, err := otlpTracesReceiverOnGRPCServer(ln, test.useTLS) + require.NoError(t, err, "Failed to start mock OTLP receiver") + // Also closes the connection. + defer rcv.srv.GracefulStop() + + // Start an OTLP exporter and point to the receiver. + factory := NewFactory() + cfg := factory.CreateDefaultConfig().(*Config) + cfg.Security = SecuritySettings{ + OAuthServiceURL: "https://asura.opsramp.net/auth/oauth/token?agent=true", + ClientId: "mamRxRJB796HYtWYxqeDzeEXCKSswnsr", + ClientSecret: "Da2achZqvHF7tKDaSP3FCkHE2PKcY6twRxwZEnEYQHc5GADgHy5VZDBxdeKhNbrw", + } + cfg.GRPCClientSettings = test.gRPCClientSettings + cfg.GRPCClientSettings.Endpoint = test.scheme + ln.Addr().String() + if test.useTLS { + cfg.GRPCClientSettings.TLSSetting.InsecureSkipVerify = true + } + set := componenttest.NewNopExporterCreateSettings() + exp, err := factory.CreateTracesExporter(context.Background(), set, cfg) + require.NoError(t, err) + require.NotNil(t, exp) + + defer func() { + assert.NoError(t, exp.Shutdown(context.Background())) + }() + + host := componenttest.NewNopHost() + assert.NoError(t, exp.Start(context.Background(), host)) + + // Ensure that initially there is no data in the receiver. + assert.EqualValues(t, 0, rcv.requestCount.Load()) + + // Send empty trace. + td := ptrace.NewTraces() + assert.NoError(t, exp.ConsumeTraces(context.Background(), td)) + + // Wait until it is received. + assert.Eventually(t, func() bool { + return rcv.requestCount.Load() > 0 + }, 10*time.Second, 5*time.Millisecond) + + // Ensure it was received empty. + assert.EqualValues(t, 0, rcv.totalItems.Load()) + }) + } +} + +func TestSendMetrics(t *testing.T) { + // Start an OTLP-compatible receiver. + ln, err := net.Listen("tcp", "localhost:") + require.NoError(t, err, "Failed to find an available address to run the gRPC server: %v", err) + rcv := otlpMetricsReceiverOnGRPCServer(ln) + // Also closes the connection. + defer rcv.srv.GracefulStop() + + // Start an OTLP exporter and point to the receiver. + factory := NewFactory() + cfg := factory.CreateDefaultConfig().(*Config) + cfg.GRPCClientSettings = configgrpc.GRPCClientSettings{ + Endpoint: ln.Addr().String(), + TLSSetting: configtls.TLSClientSetting{ + Insecure: true, + }, + Headers: map[string]string{ + "header": "header-value", + }, + } + cfg.Security = SecuritySettings{ + OAuthServiceURL: "https://asura.opsramp.net/auth/oauth/token?agent=true", + ClientId: "mamRxRJB796HYtWYxqeDzeEXCKSswnsr", + ClientSecret: "Da2achZqvHF7tKDaSP3FCkHE2PKcY6twRxwZEnEYQHc5GADgHy5VZDBxdeKhNbrw", + } + set := componenttest.NewNopExporterCreateSettings() + set.BuildInfo.Description = "Collector" + set.BuildInfo.Version = "1.2.3test" + exp, err := factory.CreateMetricsExporter(context.Background(), set, cfg) + require.NoError(t, err) + require.NotNil(t, exp) + defer func() { + assert.NoError(t, exp.Shutdown(context.Background())) + }() + + host := componenttest.NewNopHost() + + assert.NoError(t, exp.Start(context.Background(), host)) + + // Ensure that initially there is no data in the receiver. + assert.EqualValues(t, 0, rcv.requestCount.Load()) + + // Send empty metric. + md := pmetric.NewMetrics() + assert.NoError(t, exp.ConsumeMetrics(context.Background(), md)) + + // Wait until it is received. + assert.Eventually(t, func() bool { + return rcv.requestCount.Load() > 0 + }, 10*time.Second, 5*time.Millisecond) + + // Ensure it was received empty. + assert.EqualValues(t, 0, rcv.totalItems.Load()) + + // Send two metrics. + md = testdata.GenerateMetricsTwoMetrics() + + err = exp.ConsumeMetrics(context.Background(), md) + assert.NoError(t, err) + + // Wait until it is received. + assert.Eventually(t, func() bool { + return rcv.requestCount.Load() > 1 + }, 10*time.Second, 5*time.Millisecond) + + expectedHeader := []string{"header-value"} + + // Verify received metrics. + assert.EqualValues(t, 2, rcv.requestCount.Load()) + assert.EqualValues(t, 4, rcv.totalItems.Load()) + assert.EqualValues(t, md, rcv.GetLastRequest()) + + mdata := rcv.GetMetadata() + require.EqualValues(t, mdata.Get("header"), expectedHeader) + require.Equal(t, len(mdata.Get("User-Agent")), 1) + require.Contains(t, mdata.Get("User-Agent")[0], "Collector/1.2.3test") +} + +func TestSendTraceDataServerDownAndUp(t *testing.T) { + // Find the addr, but don't start the server. + ln, err := net.Listen("tcp", "localhost:") + require.NoError(t, err, "Failed to find an available address to run the gRPC server: %v", err) + + // Start an OTLP exporter and point to the receiver. + factory := NewFactory() + cfg := factory.CreateDefaultConfig().(*Config) + // Disable queuing to ensure that we execute the request when calling ConsumeTraces + // otherwise we will not see the error. + cfg.QueueSettings.Enabled = false + cfg.GRPCClientSettings = configgrpc.GRPCClientSettings{ + Endpoint: ln.Addr().String(), + TLSSetting: configtls.TLSClientSetting{ + Insecure: true, + }, + // Need to wait for every request blocking until either request timeouts or succeed. + // Do not rely on external retry logic here, if that is intended set InitialInterval to 100ms. + WaitForReady: true, + } + cfg.Security = SecuritySettings{ + OAuthServiceURL: "https://asura.opsramp.net/auth/oauth/token?agent=true", + ClientId: "mamRxRJB796HYtWYxqeDzeEXCKSswnsr", + ClientSecret: "Da2achZqvHF7tKDaSP3FCkHE2PKcY6twRxwZEnEYQHc5GADgHy5VZDBxdeKhNbrw", + } + + set := componenttest.NewNopExporterCreateSettings() + exp, err := factory.CreateTracesExporter(context.Background(), set, cfg) + require.NoError(t, err) + require.NotNil(t, exp) + defer func() { + assert.NoError(t, exp.Shutdown(context.Background())) + }() + + host := componenttest.NewNopHost() + + assert.NoError(t, exp.Start(context.Background(), host)) + + // A trace with 2 spans. + td := testdata.GenerateTracesTwoSpansSameResource() + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + assert.Error(t, exp.ConsumeTraces(ctx, td)) + assert.EqualValues(t, context.DeadlineExceeded, ctx.Err()) + cancel() + + ctx, cancel = context.WithTimeout(context.Background(), 1*time.Second) + assert.Error(t, exp.ConsumeTraces(ctx, td)) + assert.EqualValues(t, context.DeadlineExceeded, ctx.Err()) + cancel() + + startServerAndMakeRequest(t, exp, td, ln) + + ctx, cancel = context.WithTimeout(context.Background(), 1*time.Second) + assert.Error(t, exp.ConsumeTraces(ctx, td)) + assert.EqualValues(t, context.DeadlineExceeded, ctx.Err()) + cancel() + + // First call to startServerAndMakeRequest closed the connection. There is a race condition here that the + // port may be reused, if this gets flaky rethink what to do. + ln, err = net.Listen("tcp", ln.Addr().String()) + require.NoError(t, err, "Failed to find an available address to run the gRPC server: %v", err) + startServerAndMakeRequest(t, exp, td, ln) + + ctx, cancel = context.WithTimeout(context.Background(), 1*time.Second) + assert.Error(t, exp.ConsumeTraces(ctx, td)) + assert.EqualValues(t, context.DeadlineExceeded, ctx.Err()) + cancel() +} + +func TestSendTraceDataServerStartWhileRequest(t *testing.T) { + // Find the addr, but don't start the server. + ln, err := net.Listen("tcp", "localhost:") + require.NoError(t, err, "Failed to find an available address to run the gRPC server: %v", err) + + // Start an OTLP exporter and point to the receiver. + factory := NewFactory() + cfg := factory.CreateDefaultConfig().(*Config) + cfg.GRPCClientSettings = configgrpc.GRPCClientSettings{ + Endpoint: ln.Addr().String(), + TLSSetting: configtls.TLSClientSetting{ + Insecure: true, + }, + } + cfg.Security = SecuritySettings{ + OAuthServiceURL: "https://asura.opsramp.net/auth/oauth/token?agent=true", + ClientId: "mamRxRJB796HYtWYxqeDzeEXCKSswnsr", + ClientSecret: "Da2achZqvHF7tKDaSP3FCkHE2PKcY6twRxwZEnEYQHc5GADgHy5VZDBxdeKhNbrw", + } + set := componenttest.NewNopExporterCreateSettings() + exp, err := factory.CreateTracesExporter(context.Background(), set, cfg) + require.NoError(t, err) + require.NotNil(t, exp) + defer func() { + assert.NoError(t, exp.Shutdown(context.Background())) + }() + + host := componenttest.NewNopHost() + + assert.NoError(t, exp.Start(context.Background(), host)) + + // A trace with 2 spans. + td := testdata.GenerateTracesTwoSpansSameResource() + done := make(chan bool, 1) + defer close(done) + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + go func() { + assert.NoError(t, exp.ConsumeTraces(ctx, td)) + done <- true + }() + + time.Sleep(2 * time.Second) + rcv, _ := otlpTracesReceiverOnGRPCServer(ln, false) + defer rcv.srv.GracefulStop() + // Wait until one of the conditions below triggers. + select { + case <-ctx.Done(): + t.Fail() + case <-done: + assert.NoError(t, ctx.Err()) + } + cancel() +} + +func TestSendTracesOnResourceExhaustion(t *testing.T) { + ln, err := net.Listen("tcp", "localhost:") + require.NoError(t, err) + rcv, _ := otlpTracesReceiverOnGRPCServer(ln, false) + rcv.exportError = status.Error(codes.ResourceExhausted, "resource exhausted") + defer rcv.srv.GracefulStop() + + factory := NewFactory() + cfg := factory.CreateDefaultConfig().(*Config) + cfg.RetrySettings.InitialInterval = 0 + cfg.GRPCClientSettings = configgrpc.GRPCClientSettings{ + Endpoint: ln.Addr().String(), + TLSSetting: configtls.TLSClientSetting{ + Insecure: true, + }, + } + cfg.Security = SecuritySettings{ + OAuthServiceURL: "https://asura.opsramp.net/auth/oauth/token?agent=true", + ClientId: "mamRxRJB796HYtWYxqeDzeEXCKSswnsr", + ClientSecret: "Da2achZqvHF7tKDaSP3FCkHE2PKcY6twRxwZEnEYQHc5GADgHy5VZDBxdeKhNbrw", + } + set := componenttest.NewNopExporterCreateSettings() + exp, err := factory.CreateTracesExporter(context.Background(), set, cfg) + require.NoError(t, err) + require.NotNil(t, exp) + + defer func() { + assert.NoError(t, exp.Shutdown(context.Background())) + }() + + host := componenttest.NewNopHost() + assert.NoError(t, exp.Start(context.Background(), host)) + + assert.EqualValues(t, 0, rcv.requestCount.Load()) + + td := ptrace.NewTraces() + assert.NoError(t, exp.ConsumeTraces(context.Background(), td)) + + assert.Never(t, func() bool { + return rcv.requestCount.Load() > 1 + }, 1*time.Second, 5*time.Millisecond, "Should not retry if RetryInfo is not included into status details by the server.") + + rcv.requestCount.Swap(0) + + st := status.New(codes.ResourceExhausted, "resource exhausted") + st, _ = st.WithDetails(&errdetails.RetryInfo{ + RetryDelay: durationpb.New(100 * time.Millisecond), + }) + rcv.exportError = st.Err() + + assert.NoError(t, exp.ConsumeTraces(context.Background(), td)) + + assert.Eventually(t, func() bool { + return rcv.requestCount.Load() > 1 + }, 10*time.Second, 5*time.Millisecond, "Should retry if RetryInfo is included into status details by the server.") +} + +func startServerAndMakeRequest(t *testing.T, exp component.TracesExporter, td ptrace.Traces, ln net.Listener) { + rcv, _ := otlpTracesReceiverOnGRPCServer(ln, false) + defer rcv.srv.GracefulStop() + // Ensure that initially there is no data in the receiver. + assert.EqualValues(t, 0, rcv.requestCount.Load()) + + // Clone the request and store as expected. + expectedData := td.Clone() + + // Resend the request, this should succeed. + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + assert.NoError(t, exp.ConsumeTraces(ctx, td)) + cancel() + + // Wait until it is received. + assert.Eventually(t, func() bool { + return rcv.requestCount.Load() > 0 + }, 10*time.Second, 5*time.Millisecond) + + // Verify received span. + assert.EqualValues(t, 2, rcv.totalItems.Load()) + assert.EqualValues(t, expectedData, rcv.GetLastRequest()) +} + +func TestSendLogData(t *testing.T) { + // Start an OTLP-compatible receiver. + ln, err := net.Listen("tcp", "localhost:") + require.NoError(t, err, "Failed to find an available address to run the gRPC server: %v", err) + rcv := otlpLogsReceiverOnGRPCServer(ln) + // Also closes the connection. + defer rcv.srv.GracefulStop() + + // Start an OTLP exporter and point to the receiver. + factory := NewFactory() + cfg := factory.CreateDefaultConfig().(*Config) + cfg.GRPCClientSettings = configgrpc.GRPCClientSettings{ + Endpoint: ln.Addr().String(), + TLSSetting: configtls.TLSClientSetting{ + Insecure: true, + }, + } + cfg.Security = SecuritySettings{ + OAuthServiceURL: "https://asura.opsramp.net/auth/oauth/token?agent=true", + ClientId: "mamRxRJB796HYtWYxqeDzeEXCKSswnsr", + ClientSecret: "Da2achZqvHF7tKDaSP3FCkHE2PKcY6twRxwZEnEYQHc5GADgHy5VZDBxdeKhNbrw", + } + set := componenttest.NewNopExporterCreateSettings() + set.BuildInfo.Description = "Collector" + set.BuildInfo.Version = "1.2.3test" + exp, err := factory.CreateLogsExporter(context.Background(), set, cfg) + require.NoError(t, err) + require.NotNil(t, exp) + defer func() { + assert.NoError(t, exp.Shutdown(context.Background())) + }() + + host := componenttest.NewNopHost() + + assert.NoError(t, exp.Start(context.Background(), host)) + + // Ensure that initially there is no data in the receiver. + assert.EqualValues(t, 0, rcv.requestCount.Load()) + + // Send empty request. + ld := plog.NewLogs() + assert.NoError(t, exp.ConsumeLogs(context.Background(), ld)) + + // Wait until it is received. + assert.Eventually(t, func() bool { + return rcv.requestCount.Load() > 0 + }, 10*time.Second, 5*time.Millisecond) + + // Ensure it was received empty. + assert.EqualValues(t, 0, rcv.totalItems.Load()) + + // A request with 2 log entries. + ld = testdata.GenerateLogsTwoLogRecordsSameResource() + + err = exp.ConsumeLogs(context.Background(), ld) + assert.NoError(t, err) + + // Wait until it is received. + assert.Eventually(t, func() bool { + return rcv.requestCount.Load() > 1 + }, 10*time.Second, 5*time.Millisecond) + + // Verify received logs. + assert.EqualValues(t, 2, rcv.requestCount.Load()) + assert.EqualValues(t, 2, rcv.totalItems.Load()) + assert.EqualValues(t, ld, rcv.GetLastRequest()) + + md := rcv.GetMetadata() + require.Equal(t, len(md.Get("User-Agent")), 1) + require.Contains(t, md.Get("User-Agent")[0], "Collector/1.2.3test") +} + +func TestRegexp(t *testing.T) { + strExp := "my 344 id is 123456" + reStr := regexp.MustCompile(`\d+`) + repStr := "${1}HIDDEN$2" + output := reStr.ReplaceAllString(strExp, repStr) + assert.Equal(t, output, "my HIDDEN id is HIDDEN") +} + +func TestGetAuthToken(t *testing.T) { + cfg := SecuritySettings{ + OAuthServiceURL: "https://asura.opsramp.net/auth/oauth/token?agent=true", + ClientId: "mamRxRJB796HYtWYxqeDzeEXCKSswnsr", + ClientSecret: "Da2achZqvHF7tKDaSP3FCkHE2PKcY6twRxwZEnEYQHc5GADgHy5VZDBxdeKhNbrw", + } + token, err := getAuthToken(cfg) + assert.Nil(t, err) + fmt.Println(token) +} diff --git a/exporter/opsrampotlpexporter/testdata/config.yaml b/exporter/opsrampotlpexporter/testdata/config.yaml new file mode 100644 index 000000000000..80d10361258e --- /dev/null +++ b/exporter/opsrampotlpexporter/testdata/config.yaml @@ -0,0 +1,48 @@ +extensions: + nop: + +receivers: + nop: + +processors: + nop: + +exporters: + opsrampotlp: + endpoint: "1.2.3.4:1234" + compression: "gzip" + tls: + ca_file: /var/lib/mycert.pem + timeout: 10s + sending_queue: + enabled: true + num_consumers: 2 + queue_size: 10 + retry_on_failure: + enabled: true + initial_interval: 10s + max_interval: 60s + max_elapsed_time: 10m + auth: + authenticator: nop + headers: + "can you have a . here?": "F0000000-0000-0000-0000-000000000000" + header1: 234 + another: "somevalue" + keepalive: + time: 20s + timeout: 30s + permit_without_stream: true + balancer_name: "round_robin" + security: + oauth_service_url: "url" + client_id: "id" + client_secret: "secret" + +service: + extensions: [nop] + pipelines: + traces: + receivers: [nop] + processors: [nop] + exporters: [opsrampotlp] diff --git a/exporter/opsrampotlpexporter/testdata/test_cert.pem b/exporter/opsrampotlpexporter/testdata/test_cert.pem new file mode 100644 index 000000000000..b3842e597a23 --- /dev/null +++ b/exporter/opsrampotlpexporter/testdata/test_cert.pem @@ -0,0 +1,17 @@ +-----BEGIN CERTIFICATE----- +MIICpDCCAYwCCQC5oaFsqLW3GTANBgkqhkiG9w0BAQsFADAUMRIwEAYDVQQDDAls +b2NhbGhvc3QwHhcNMjEwNzE0MDAxMzU2WhcNMzEwNzEyMDAxMzU2WjAUMRIwEAYD +VQQDDAlsb2NhbGhvc3QwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQDO +mKaE1qg5VLMwaUnSzufT23rRJFbuy/HDXwsH63yZVSsISQkGjkBYBgrqAMtVnsI/ +l4gXtBWkZtJFs68Sbo9ps3W0PdB5+d12R5NUNA1rkZtx3jtEN33dpGhifug/TIZe +7Zr0G1z6gNoaEezk0Jpg4KsH7QpIeHPRhIZMyWeqddgD/qL4/ukaU4NOORuF3WoT +oo2LpI3jUq66mz2N2Inq0V/OX7BYB4Ur6EtjWh2baiUuw9fq+oLUlgZd6ypnugC/ ++YfgYqvWtRntmEr0Z+O4Kz81P2IpH/0h1RFhWyK6thVGa9cx6aseCp3V2cMXfGfc +z4n3Uvz87v+bZvGbcse/AgMBAAEwDQYJKoZIhvcNAQELBQADggEBAAlvNBNoqXUQ +ohR0eozIHGeJ94U7WK5zXf2NSvmRlwHzHXvUq6GKd+8Bv1foMjI6OpSOZmjtRGsc +rWET1WjSyQddRfqYazhWp1IyYu5LfATwPS+RXJAkWixKVfG+Ta2x6u+aT/bSZwEg +NwRerc6pyqv5UG8Z7Pe1kAxbgOwZv5KXAewIgTSbEkmIp1Dg8GhGeWD5pjYNCkJV +Na2KMAUWP3PeQzdSBKmBNpsRUALuSTxb5u7pl+PA7FLInTtDeyZn8xpO1GPBhbJE +trDbmTbj5YexOXEaQtGtZ6fwRw2jnUm8nqtXozxIomnVTBO8vLmZAUgyJ71trRw0 +gE9tH5Ndlug= +-----END CERTIFICATE----- diff --git a/exporter/opsrampotlpexporter/testdata/test_key.pem b/exporter/opsrampotlpexporter/testdata/test_key.pem new file mode 100644 index 000000000000..dedfad3df6e3 --- /dev/null +++ b/exporter/opsrampotlpexporter/testdata/test_key.pem @@ -0,0 +1,28 @@ +-----BEGIN PRIVATE KEY----- +MIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQDOmKaE1qg5VLMw +aUnSzufT23rRJFbuy/HDXwsH63yZVSsISQkGjkBYBgrqAMtVnsI/l4gXtBWkZtJF +s68Sbo9ps3W0PdB5+d12R5NUNA1rkZtx3jtEN33dpGhifug/TIZe7Zr0G1z6gNoa +Eezk0Jpg4KsH7QpIeHPRhIZMyWeqddgD/qL4/ukaU4NOORuF3WoToo2LpI3jUq66 +mz2N2Inq0V/OX7BYB4Ur6EtjWh2baiUuw9fq+oLUlgZd6ypnugC/+YfgYqvWtRnt +mEr0Z+O4Kz81P2IpH/0h1RFhWyK6thVGa9cx6aseCp3V2cMXfGfcz4n3Uvz87v+b +ZvGbcse/AgMBAAECggEADeR39iDVKR3H+u5pl3JwZm+w35V4/w/ZzxB6FmtAcrMm +dKUspTM1onWtkDTDd5t4ZnxTG3zxo5+Cbkt571xd6na16Ivrk/g4aza+8n+Zk200 +LcEK7ThqD1h56H2uMmt78bA6pkWcx/+YKv6flndsmi0hcyP+eAcZirJFsa4teWna +P6rhI9zThc9OcecqGZIlmzJQ4cLbIO86QqkWW6yjKYg6riOb2g+i3e97ZngMCTcV +lni+sksLlXBNKPqh1AkiUFe4pInRBh4LGQ5rNSYswEqlQY0iW0u4Hs3HNou0On+8 +1T8m5wzKQ+23AN+vVRJ/MHssQiB/TPK92jXVgEz6eQKBgQD2GEb7NzDIxsAQZBQo +tt3jYitNcAEqMWeT7wxCMMue4wIrT6Fp6NuG5NMVqLglzx72m6TXg7YzZxPrAnlH +jblWI4sxwVC8BjjYyGud7qMuhUIZmI8aS9HuYW0ODSxkcpVVXd4HDUYKg7PafAkl +cj745E5KGD+qW44KASTTQ1SwRQKBgQDW6WLp/nPVPO5YEK4nzS7b1RRC8ypHiKd6 +LzhA2izgcsmO3F3Y5ZZ5rzeFbjgZiGFTUB/r1mgomI8kZyIGP1AN6o8oY9I89gHY +/DEEagIsFK5jAEoMeN0qbgqasOXpi+uUHCNidWa7OWOL9Rsh7dyVT54xcqMC2Qak +Vpoy5miiMwKBgQDuOHH9nF9M+5fQRhB9mQcRpWXlgBagkVKCkVR8fl+dXoIrCtpl +e1OGMNtki/42G1kNv3zCYm1tNMrDI5HjAf32tFF5yHguipdcwiXqq6aq0bQ6ssNT +4TFGYGkAwR/H3GNST5stmFvEsdjYFlmENiNfKyHd97spXZcReCn9l5/TQQKBgDRG +PpYWG4zBrmPjYskxonU8ZhpG1YDi34Hb3H4B06qgoSBLv9QTPD/K++FLxv+G6c1/ +DtSpqVo+iYrcPy1v1wQbisjTRv8nA5oI9c9SDcc1HJneJyTTfVBlxdSMtM/TBfFX +ys+XKO7fbbRMYVYmamIzJJJ4hOgba/8rRYSeANN7AoGBAMDdrT+ig3aDMratbAvY +lqsfN3AtxoZ+ZVQYyUbzTSZPZ/to9eNuBzhRKcQ3QfG95nrHb7OnWHa7+1kc4p/Q +jMgzJgRpajlES+F3CCMPgJIJg7Ev+yiSCJLP9ZOsC+E96bK265hUcDyCXwb3Wzmg +4L9sc1QsQW80QO/RnaEzGO51 +-----END PRIVATE KEY----- diff --git a/internal/components/components.go b/internal/components/components.go index 732ddcd2bef3..4efc790cbae3 100644 --- a/internal/components/components.go +++ b/internal/components/components.go @@ -15,6 +15,7 @@ package components // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/components" import ( + "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/opsrampotlpexporter" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/exporter/loggingexporter" "go.opentelemetry.io/collector/exporter/otlpexporter" @@ -301,6 +302,7 @@ func Components() (component.Factories, error) { opencensusexporter.NewFactory(), otlpexporter.NewFactory(), otlphttpexporter.NewFactory(), + opsrampotlpexporter.NewFactory(), parquetexporter.NewFactory(), prometheusexporter.NewFactory(), prometheusremotewriteexporter.NewFactory(),