From b31f07089a535e493a74c43d6b88cdfe13082cb4 Mon Sep 17 00:00:00 2001 From: swar8080 Date: Sun, 31 Mar 2024 14:20:10 -0400 Subject: [PATCH 1/6] RabbitMQ exporter implementation --- .../rabbitmq-exporter-implementation.yaml | 27 ++ exporter/rabbitmqexporter/README.md | 18 +- exporter/rabbitmqexporter/config.go | 30 +- exporter/rabbitmqexporter/config_test.go | 48 ++- exporter/rabbitmqexporter/factory.go | 66 ++- exporter/rabbitmqexporter/factory_test.go | 21 + exporter/rabbitmqexporter/go.mod | 12 +- exporter/rabbitmqexporter/go.sum | 23 +- .../internal/publisher/client.go | 110 +++++ .../internal/publisher/publisher.go | 196 +++++++++ .../internal/publisher/publisher_test.go | 388 ++++++++++++++++++ exporter/rabbitmqexporter/marshaler.go | 45 ++ exporter/rabbitmqexporter/marshaler_test.go | 73 ++++ .../rabbitmqexporter/rabbitmq_exporter.go | 113 ++++- .../rabbitmq_exporter_test.go | 171 ++++++++ .../testdata/test-config.yaml | 30 +- 16 files changed, 1311 insertions(+), 60 deletions(-) create mode 100644 .chloggen/rabbitmq-exporter-implementation.yaml create mode 100644 exporter/rabbitmqexporter/internal/publisher/client.go create mode 100644 exporter/rabbitmqexporter/internal/publisher/publisher.go create mode 100644 exporter/rabbitmqexporter/internal/publisher/publisher_test.go create mode 100644 exporter/rabbitmqexporter/marshaler.go create mode 100644 exporter/rabbitmqexporter/marshaler_test.go create mode 100644 exporter/rabbitmqexporter/rabbitmq_exporter_test.go diff --git a/.chloggen/rabbitmq-exporter-implementation.yaml b/.chloggen/rabbitmq-exporter-implementation.yaml new file mode 100644 index 000000000000..d41b66387f5b --- /dev/null +++ b/.chloggen/rabbitmq-exporter-implementation.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: rabbitmqexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Implements the RabbitMQ exporter + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [28891] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/exporter/rabbitmqexporter/README.md b/exporter/rabbitmqexporter/README.md index a396f98427fa..2ea62fd86f1c 100644 --- a/exporter/rabbitmqexporter/README.md +++ b/exporter/rabbitmqexporter/README.md @@ -19,16 +19,15 @@ The following settings can be configured: - `endpoint` (required, ex = amqp://localhost:5672): Endpoint to connect to RabbitMQ - `vhost` (optional): The RabbitMQ [virtual host](https://www.rabbitmq.com/docs/vhosts) to connect to - `auth`: - - `sasl`: Configuration if using SASL PLAIN authentication + - `plain`: Configuration if using SASL PLAIN authentication - `username` (required): username for authentication - - `password` (required): password for authentication - - `tls` (optional): TODO, need to add this + - `password`: password for authentication + - `tls` (optional): [TLS configuration](https://github.com/open-telemetry/opentelemetry-collector/blob/main/config/configtls/configtls.go#L32) - `routing`: - `routing_key` (default = otlp_spans for traces, otlp_metrics for metrics, otlp_logs for logs): Routing key used to route exported messages to RabbitMQ consumers + - `exchange`: Name of the exchange used to route messages. If omitted, the [default exchange](https://www.rabbitmq.com/tutorials/amqp-concepts#exchange-default) is used which routes to a queue with the same as the routing key. Only [direct exchanges](https://www.rabbitmq.com/tutorials/amqp-concepts#exchange-direct) are currently supported. Note that this component does not handle queue creation or binding. - `durable` (default = true): Whether to instruct RabbitMQ to make messages [durable](https://www.rabbitmq.com/docs/queues#durability) by writing to disk - - `message_body_encoding`: (default = "otlp_proto"): The encoding of telemetry sent to RabbitMQ - - `otlp_proto`: payload is Protobuf serialized from `ExportTraceServiceRequest` if set as a traces exporter or `ExportMetricsServiceRequest` for metrics or `ExportLogsServiceRequest` for logs. - - `otlp_json`: ** EXPERIMENTAL ** payload is JSON serialized from `ExportTraceServiceRequest` if set as a traces exporter or `ExportMetricsServiceRequest` for metrics or `ExportLogsServiceRequest` for logs. + - `encoding_extension`: (defaults to OTLP protobuf format): ID of the [encoding extension](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/extension/encoding) to use to marshal data - `retry_on_failure`: - `enabled` (default = false) @@ -40,7 +39,12 @@ exporters: connection: endpoint: amqp://localhost:5672 auth: - sasl: + plain: username: user password: pass + encoding_extension: otlp_encoding/rabbitmq + +extensions: + otlp_encoding/rabbitmq: + protocol: otlp_json ``` \ No newline at end of file diff --git a/exporter/rabbitmqexporter/config.go b/exporter/rabbitmqexporter/config.go index 9aad0213d7a5..34bf766ad4c1 100644 --- a/exporter/rabbitmqexporter/config.go +++ b/exporter/rabbitmqexporter/config.go @@ -4,33 +4,42 @@ package rabbitmqexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/rabbitmqexporter" import ( + "errors" + "time" + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configretry" + "go.opentelemetry.io/collector/config/configtls" ) type Config struct { Connection ConnectionConfig `mapstructure:"connection"` Routing RoutingConfig `mapstructure:"routing"` - MessageBodyEncoding string `mapstructure:"message_body_encoding"` + EncodingExtensionId *component.ID `mapstructure:"encoding_extension"` Durable bool `mapstructure:"durable"` RetrySettings configretry.BackOffConfig `mapstructure:"retry_on_failure"` } type ConnectionConfig struct { - Endpoint string `mapstructure:"endpoint"` - VHost string `mapstructure:"vhost"` - Auth AuthConfig `mapstructure:"auth"` + Endpoint string `mapstructure:"endpoint"` + VHost string `mapstructure:"vhost"` + TLSConfig *configtls.ClientConfig `mapstructure:"tls"` + Auth AuthConfig `mapstructure:"auth"` + ConnectionTimeout time.Duration `mapstructure:"connection_timeout"` + Heartbeat time.Duration `mapstructure:"heartbeat"` + PublishConfirmationTimeout time.Duration `mapstructure:"publish_confirmation_timeout"` } type RoutingConfig struct { + Exchange string `mapstructure:"exchange"` RoutingKey string `mapstructure:"routing_key"` } type AuthConfig struct { - SASL SASLConfig `mapstructure:"sasl"` + Plain PlainAuth `mapstructure:"plain"` } -type SASLConfig struct { +type PlainAuth struct { Username string `mapstructure:"username"` Password string `mapstructure:"password"` } @@ -39,5 +48,14 @@ var _ component.Config = (*Config)(nil) // Validate checks if the exporter configuration is valid func (cfg *Config) Validate() error { + if cfg.Connection.Endpoint == "" { + return errors.New("connection.endpoint is required") + } + + // Password-less users are possible so only validate username + if cfg.Connection.Auth.Plain.Username == "" { + return errors.New("connection.auth.plain.username is required") + } + return nil } diff --git a/exporter/rabbitmqexporter/config_test.go b/exporter/rabbitmqexporter/config_test.go index 22cb41018d68..41b04db7c342 100644 --- a/exporter/rabbitmqexporter/config_test.go +++ b/exporter/rabbitmqexporter/config_test.go @@ -4,18 +4,23 @@ package rabbitmqexporter import ( + "errors" "path/filepath" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configretry" + "go.opentelemetry.io/collector/config/configtls" "go.opentelemetry.io/collector/confmap/confmaptest" "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/rabbitmqexporter/internal/metadata" ) +var encodingComponentId = component.NewIDWithName(component.MustNewType("otlp_encoding"), "rabbitmq123") + func TestLoadConfig(t *testing.T) { t.Parallel() @@ -23,30 +28,45 @@ func TestLoadConfig(t *testing.T) { require.NoError(t, err) tests := []struct { - id component.ID - expected component.Config + id component.ID + expected component.Config + errorMessage string }{ { - id: component.NewIDWithName(metadata.Type, ""), - expected: createDefaultConfig().(*Config), + id: component.NewIDWithName(metadata.Type, "missing_endpoint"), + errorMessage: "connection.endpoint is required", + }, + { + id: component.NewIDWithName(metadata.Type, "missing_plainauth_username"), + errorMessage: "connection.auth.plain.username is required", }, { id: component.NewIDWithName(metadata.Type, "all_fields"), expected: &Config{ Connection: ConnectionConfig{ - Endpoint: "amqp://localhost:5672", + Endpoint: "amqps://localhost:5672", VHost: "vhost1", Auth: AuthConfig{ - SASL: SASLConfig{ + Plain: PlainAuth{ Username: "user", Password: "pass", }, }, + TLSConfig: &configtls.ClientConfig{ + TLSSetting: configtls.Config{ + CAFile: "cert123", + }, + Insecure: true, + }, + ConnectionTimeout: time.Millisecond, + Heartbeat: time.Millisecond * 2, + PublishConfirmationTimeout: time.Millisecond * 3, }, Routing: RoutingConfig{ + Exchange: "amq.direct", RoutingKey: "custom_routing_key", }, - MessageBodyEncoding: "otlp_json", + EncodingExtensionId: &encodingComponentId, Durable: false, RetrySettings: configretry.BackOffConfig{ Enabled: true, @@ -60,14 +80,16 @@ func TestLoadConfig(t *testing.T) { Endpoint: "amqp://localhost:5672", VHost: "", Auth: AuthConfig{ - SASL: SASLConfig{ + Plain: PlainAuth{ Username: "user", Password: "pass", }, }, + ConnectionTimeout: defaultConnectionTimeout, + Heartbeat: defaultConnectionHeartbeat, + PublishConfirmationTimeout: defaultPublishConfirmationTimeout, }, - MessageBodyEncoding: "otlp_proto", - Durable: true, + Durable: true, RetrySettings: configretry.BackOffConfig{ Enabled: false, }, @@ -84,6 +106,12 @@ func TestLoadConfig(t *testing.T) { require.NoError(t, err) require.NoError(t, component.UnmarshalConfig(sub, cfg)) + if tt.expected == nil { + err = errors.Join(err, component.ValidateConfig(cfg)) + assert.ErrorContains(t, err, tt.errorMessage) + return + } + assert.NoError(t, component.ValidateConfig(cfg)) assert.Equal(t, tt.expected, cfg) }) diff --git a/exporter/rabbitmqexporter/factory.go b/exporter/rabbitmqexporter/factory.go index 23804efdf432..19267e1ba119 100644 --- a/exporter/rabbitmqexporter/factory.go +++ b/exporter/rabbitmqexporter/factory.go @@ -5,7 +5,10 @@ package rabbitmqexporter // import "github.com/open-telemetry/opentelemetry-coll import ( "context" + "crypto/tls" + "time" + "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/rabbitmqexporter/internal/publisher" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configretry" "go.opentelemetry.io/collector/consumer" @@ -16,7 +19,17 @@ import ( ) const ( - defaultEncoding = "otlp_proto" + defaultConnectionTimeout = time.Second * 10 + defaultConnectionHeartbeat = time.Second * 5 + defaultPublishConfirmationTimeout = time.Second * 5 + + spansRoutingKey = "otlp_spans" + metricsRoutingKey = "otlp_metrics" + logsRoutingKey = "otlp_logs" + + spansConnectionName = "otel-collector-spans" + metricsConnectionName = "otel-collector-metrics" + logsConnectionName = "otel-collector-logs" ) func NewFactory() exporter.Factory { @@ -34,9 +47,13 @@ func createDefaultConfig() component.Config { Enabled: false, } return &Config{ - MessageBodyEncoding: defaultEncoding, - Durable: true, - RetrySettings: retrySettings, + Durable: true, + RetrySettings: retrySettings, + Connection: ConnectionConfig{ + ConnectionTimeout: defaultConnectionTimeout, + Heartbeat: defaultConnectionHeartbeat, + PublishConfirmationTimeout: defaultPublishConfirmationTimeout, + }, } } @@ -46,13 +63,15 @@ func createTracesExporter( cfg component.Config, ) (exporter.Traces, error) { config := cfg.(*Config) - r := newRabbitmqExporter(config, set.TelemetrySettings) + + routingKey := getRoutingKeyOrDefault(config, spansRoutingKey) + r := newRabbitmqExporter(config, set.TelemetrySettings, newPublisherFactory(set), newTLSFactory(config), routingKey, spansConnectionName) return exporterhelper.NewTracesExporter( ctx, set, cfg, - r.pushTraces, + r.publishTraces, exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}), exporterhelper.WithStart(r.start), exporterhelper.WithShutdown(r.shutdown), @@ -66,13 +85,15 @@ func createMetricsExporter( cfg component.Config, ) (exporter.Metrics, error) { config := (cfg.(*Config)) - r := newRabbitmqExporter(config, set.TelemetrySettings) + + routingKey := getRoutingKeyOrDefault(config, metricsRoutingKey) + r := newRabbitmqExporter(config, set.TelemetrySettings, newPublisherFactory(set), newTLSFactory(config), routingKey, metricsConnectionName) return exporterhelper.NewMetricsExporter( ctx, set, cfg, - r.pushMetrics, + r.publishMetrics, exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}), exporterhelper.WithStart(r.start), exporterhelper.WithShutdown(r.shutdown), @@ -86,16 +107,41 @@ func createLogsExporter( cfg component.Config, ) (exporter.Logs, error) { config := (cfg.(*Config)) - r := newRabbitmqExporter(config, set.TelemetrySettings) + + routingKey := getRoutingKeyOrDefault(config, logsRoutingKey) + r := newRabbitmqExporter(config, set.TelemetrySettings, newPublisherFactory(set), newTLSFactory(config), routingKey, logsConnectionName) return exporterhelper.NewLogsExporter( ctx, set, cfg, - r.pushLogs, + r.publishLogs, exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}), exporterhelper.WithStart(r.start), exporterhelper.WithShutdown(r.shutdown), exporterhelper.WithRetry(config.RetrySettings), ) } + +func getRoutingKeyOrDefault(config *Config, fallback string) string { + routingKey := fallback + if config.Routing.RoutingKey != "" { + routingKey = config.Routing.RoutingKey + } + return routingKey +} + +func newPublisherFactory(set exporter.CreateSettings) publisherFactory { + return func(dialConfig publisher.DialConfig) (publisher.Publisher, error) { + return publisher.NewConnection(set.Logger, publisher.NewAmqpClient(), dialConfig) + } +} + +func newTLSFactory(config *Config) tlsFactory { + if config.Connection.TLSConfig != nil { + return config.Connection.TLSConfig.LoadTLSConfig + } + return func() (*tls.Config, error) { + return nil, nil + } +} diff --git a/exporter/rabbitmqexporter/factory_test.go b/exporter/rabbitmqexporter/factory_test.go index a5787dd6881b..4ea26b28d78b 100644 --- a/exporter/rabbitmqexporter/factory_test.go +++ b/exporter/rabbitmqexporter/factory_test.go @@ -9,6 +9,7 @@ import ( "github.com/stretchr/testify/assert" "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/config/configtls" "go.opentelemetry.io/collector/exporter/exportertest" ) @@ -45,3 +46,23 @@ func TestCreateLogsExporter(t *testing.T) { assert.NoError(t, err) assert.NotNil(t, te) } + +func TestCreateExporterWithCustomRoutingKey(t *testing.T) { + factory := NewFactory() + cfg := factory.CreateDefaultConfig().(*Config) + cfg.Routing.RoutingKey = "custom_routing_key" + + te, err := factory.CreateLogsExporter(context.Background(), exportertest.NewNopCreateSettings(), cfg) + assert.NoError(t, err) + assert.NotNil(t, te) +} + +func TestCreateExporterWithTLSSettings(t *testing.T) { + factory := NewFactory() + cfg := factory.CreateDefaultConfig().(*Config) + cfg.Connection.TLSConfig = &configtls.ClientConfig{} + + te, err := factory.CreateLogsExporter(context.Background(), exportertest.NewNopCreateSettings(), cfg) + assert.NoError(t, err) + assert.NotNil(t, te) +} diff --git a/exporter/rabbitmqexporter/go.mod b/exporter/rabbitmqexporter/go.mod index bfcac9400582..bbd567655b01 100644 --- a/exporter/rabbitmqexporter/go.mod +++ b/exporter/rabbitmqexporter/go.mod @@ -3,9 +3,12 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/exporter/rabbit go 1.21 require ( + github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.97.0 + github.com/rabbitmq/amqp091-go v1.9.0 github.com/stretchr/testify v1.9.0 go.opentelemetry.io/collector/component v0.97.1-0.20240327181407-1038b67c85a0 go.opentelemetry.io/collector/config/configretry v0.97.1-0.20240327181407-1038b67c85a0 + go.opentelemetry.io/collector/config/configtls v0.97.0 go.opentelemetry.io/collector/confmap v0.97.1-0.20240327181407-1038b67c85a0 go.opentelemetry.io/collector/consumer v0.97.1-0.20240327181407-1038b67c85a0 go.opentelemetry.io/collector/exporter v0.97.1-0.20240327181407-1038b67c85a0 @@ -13,13 +16,15 @@ require ( go.opentelemetry.io/otel/metric v1.24.0 go.opentelemetry.io/otel/trace v1.24.0 go.uber.org/goleak v1.3.0 + go.uber.org/zap v1.27.0 ) require ( github.com/beorn7/perks v1.0.1 // indirect - github.com/cenkalti/backoff/v4 v4.2.1 // indirect + github.com/cenkalti/backoff/v4 v4.3.0 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect + github.com/fsnotify/fsnotify v1.7.0 // indirect github.com/go-logr/logr v1.4.1 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/go-viper/mapstructure/v2 v2.0.0-alpha.1 // indirect @@ -39,7 +44,9 @@ require ( github.com/prometheus/client_model v0.6.0 // indirect github.com/prometheus/common v0.48.0 // indirect github.com/prometheus/procfs v0.12.0 // indirect + github.com/stretchr/objx v0.5.2 // indirect go.opentelemetry.io/collector v0.97.1-0.20240327181407-1038b67c85a0 // indirect + go.opentelemetry.io/collector/config/configopaque v1.4.0 // indirect go.opentelemetry.io/collector/config/configtelemetry v0.97.1-0.20240327181407-1038b67c85a0 // indirect go.opentelemetry.io/collector/extension v0.97.1-0.20240327181407-1038b67c85a0 // indirect go.opentelemetry.io/collector/receiver v0.97.1-0.20240327181407-1038b67c85a0 // indirect @@ -48,7 +55,6 @@ require ( go.opentelemetry.io/otel/sdk v1.24.0 // indirect go.opentelemetry.io/otel/sdk/metric v1.24.0 // indirect go.uber.org/multierr v1.11.0 // indirect - go.uber.org/zap v1.27.0 // indirect golang.org/x/net v0.21.0 // indirect golang.org/x/sys v0.18.0 // indirect golang.org/x/text v0.14.0 // indirect @@ -57,3 +63,5 @@ require ( google.golang.org/protobuf v1.33.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) + +replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal => ../../internal/coreinternal diff --git a/exporter/rabbitmqexporter/go.sum b/exporter/rabbitmqexporter/go.sum index 8ae2888c1e8a..d9d989d520af 100644 --- a/exporter/rabbitmqexporter/go.sum +++ b/exporter/rabbitmqexporter/go.sum @@ -1,12 +1,14 @@ github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= -github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM= -github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= +github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8= +github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA= +github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ= github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= @@ -35,8 +37,11 @@ github.com/knadh/koanf/providers/confmap v0.1.0 h1:gOkxhHkemwG4LezxxN8DMOFopOPgh github.com/knadh/koanf/providers/confmap v0.1.0/go.mod h1:2uLhxQzJnyHKfxG927awZC7+fyHFdQkd697K4MdLnIU= github.com/knadh/koanf/v2 v2.1.0 h1:eh4QmHHBuU8BybfIJ8mB8K8gsGCD/AUQTdwGq/GzId8= github.com/knadh/koanf/v2 v2.1.0/go.mod h1:4mnTRbZCK+ALuBXHZMjDfG9y714L7TykVnZkXbMU3Es= +github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/mitchellh/copystructure v1.2.0 h1:vpKXTN4ewci03Vljg/q9QvCGUDttBOGBIa15WveJJGw= @@ -58,10 +63,17 @@ github.com/prometheus/common v0.48.0 h1:QO8U2CdOzSn1BBsmXJXduaaW+dY/5QLjfB8svtSz github.com/prometheus/common v0.48.0/go.mod h1:0/KsvlIEfPQCQ5I2iNSAWKPZziNCvRs5EC6ILDTlAPc= github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k6Bo= github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo= +github.com/rabbitmq/amqp091-go v1.9.0 h1:qrQtyzB4H8BQgEuJwhmVQqVHB9O4+MNDJCCAcpc3Aoo= +github.com/rabbitmq/amqp091-go v1.9.0/go.mod h1:+jPrT9iY2eLjRaMSRHUhc3z14E/l85kv/f+6luSD3pc= github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY= +github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= @@ -70,10 +82,14 @@ go.opentelemetry.io/collector v0.97.1-0.20240327181407-1038b67c85a0 h1:dvxsnQ+nB go.opentelemetry.io/collector v0.97.1-0.20240327181407-1038b67c85a0/go.mod h1:V6xquYAaO2VHVu4DBK28JYuikRdZajh7DH5Vl/Y8NiA= go.opentelemetry.io/collector/component v0.97.1-0.20240327181407-1038b67c85a0 h1:OBXZrNlbQtCfpcqfVmKfsiqEKket/cHm61e4l2hfxuo= go.opentelemetry.io/collector/component v0.97.1-0.20240327181407-1038b67c85a0/go.mod h1:F/m3HMlkb16RKI7wJjgbECK1IZkAcmB8bu7yD8XOkwM= +go.opentelemetry.io/collector/config/configopaque v1.4.0 h1:5KgD9oLN+N07HqDsLzUrU0mE2pC8cMhrCSC1Nf8CEO4= +go.opentelemetry.io/collector/config/configopaque v1.4.0/go.mod h1:7Qzo69x7i+FaNELeA9jmZtVvfnR5lE6JYa5YEOCJPFQ= go.opentelemetry.io/collector/config/configretry v0.97.1-0.20240327181407-1038b67c85a0 h1:kkApwmm9g5yXaKwVR+9gWTv/nOdziAHX3tcVrAv+1Bc= go.opentelemetry.io/collector/config/configretry v0.97.1-0.20240327181407-1038b67c85a0/go.mod h1:s7A6ZGxK8bxqidFzwbr2pITzbsB2qf+aeHEDQDcanV8= go.opentelemetry.io/collector/config/configtelemetry v0.97.1-0.20240327181407-1038b67c85a0 h1:n6gNCKxrCs3hD+jafL93JdtPVl05p+C5PecoNE7YUrw= go.opentelemetry.io/collector/config/configtelemetry v0.97.1-0.20240327181407-1038b67c85a0/go.mod h1:YV5PaOdtnU1xRomPcYqoHmyCr48tnaAREeGO96EZw8o= +go.opentelemetry.io/collector/config/configtls v0.97.0 h1:wmXj/rKQUGMZzbHVCTyB+xUWImsGxnLqhivwjBE0FdI= +go.opentelemetry.io/collector/config/configtls v0.97.0/go.mod h1:ev/fMI6hm1WTSHHEAEoVjF3RZj0qf38E/XO5itFku7k= go.opentelemetry.io/collector/confmap v0.97.1-0.20240327181407-1038b67c85a0 h1:Cm5WDKNnmKLZmiAzodv3LLodAN3fAZFl+Q6jek/K6xU= go.opentelemetry.io/collector/confmap v0.97.1-0.20240327181407-1038b67c85a0/go.mod h1:AnJmZcZoOLuykSXGiAf3shi11ZZk5ei4tZd9dDTTpWE= go.opentelemetry.io/collector/consumer v0.97.1-0.20240327181407-1038b67c85a0 h1:13pZ9wIF0ogiuAk+KVV8ekEGyYXqS44PV9tIKbAIzLc= @@ -98,6 +114,7 @@ go.opentelemetry.io/otel/sdk/metric v1.24.0 h1:yyMQrPzF+k88/DbH7o4FMAs80puqd+9os go.opentelemetry.io/otel/sdk/metric v1.24.0/go.mod h1:I6Y5FjH6rvEnTTAYQz3Mmv2kl6Ek5IIrmwTLqMrrOE0= go.opentelemetry.io/otel/trace v1.24.0 h1:CsKnnL4dUAr/0llH9FKuc698G04IrpWV0MQA/Y1YELI= go.opentelemetry.io/otel/trace v1.24.0/go.mod h1:HPc3Xr/cOApsBI154IU0OI0HJexz+aw5uPdbs3UCjNU= +go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= @@ -144,7 +161,9 @@ google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQ google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI= google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/exporter/rabbitmqexporter/internal/publisher/client.go b/exporter/rabbitmqexporter/internal/publisher/client.go new file mode 100644 index 000000000000..905d7da564a1 --- /dev/null +++ b/exporter/rabbitmqexporter/internal/publisher/client.go @@ -0,0 +1,110 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package publisher // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/rabbitmqexporter/internal/publisher" + +import ( + "context" + + amqp "github.com/rabbitmq/amqp091-go" +) + +type AmqpClient interface { + DialConfig(url string, config amqp.Config) (Connection, error) +} + +type Connection interface { + IsClosed() bool + Channel() (Channel, error) + NotifyClose(receiver chan *amqp.Error) chan *amqp.Error + Close() error +} + +type Channel interface { + Confirm(noWait bool) error + PublishWithDeferredConfirmWithContext(ctx context.Context, exchange, key string, mandatory, immediate bool, msg amqp.Publishing) (DeferredConfirmation, error) + IsClosed() bool + Close() error +} + +type DeferredConfirmation interface { + Done() <-chan struct{} + Acked() bool +} + +func NewAmqpClient() AmqpClient { + return &client{} +} + +type client struct{} + +type connectionHolder struct { + connection *amqp.Connection +} + +type channelHolder struct { + channel *amqp.Channel +} + +type deferredConfirmationHolder struct { + confirmation *amqp.DeferredConfirmation +} + +func (*client) DialConfig(url string, config amqp.Config) (Connection, error) { + con, err := amqp.DialConfig(url, config) + if err != nil { + return nil, err + } + + return &connectionHolder{ + connection: con, + }, nil +} + +func (c *connectionHolder) Channel() (Channel, error) { + channel, err := c.connection.Channel() + if err != nil { + return nil, err + } + return &channelHolder{channel: channel}, nil +} + +func (c *connectionHolder) IsClosed() bool { + return c.connection.IsClosed() +} + +func (c *connectionHolder) NotifyClose(receiver chan *amqp.Error) chan *amqp.Error { + return c.connection.NotifyClose(receiver) +} + +func (c *connectionHolder) Close() error { + return c.connection.Close() +} + +func (c *channelHolder) Confirm(noWait bool) error { + return c.channel.Confirm(noWait) +} + +func (c *channelHolder) PublishWithDeferredConfirmWithContext(ctx context.Context, exchange, key string, mandatory, immediate bool, msg amqp.Publishing) (DeferredConfirmation, error) { + confirmation, err := c.channel.PublishWithDeferredConfirmWithContext(ctx, exchange, key, mandatory, immediate, msg) + if err != nil { + return nil, err + } + return &deferredConfirmationHolder{confirmation: confirmation}, nil +} + +func (c *channelHolder) IsClosed() bool { + return c.channel.IsClosed() +} + +func (c *channelHolder) Close() error { + return c.channel.Close() +} + +func (d *deferredConfirmationHolder) Done() <-chan struct{} { + return d.confirmation.Done() +} + +func (d *deferredConfirmationHolder) Acked() bool { + return d.confirmation.Acked() +} diff --git a/exporter/rabbitmqexporter/internal/publisher/publisher.go b/exporter/rabbitmqexporter/internal/publisher/publisher.go new file mode 100644 index 000000000000..c9d3cfe1382c --- /dev/null +++ b/exporter/rabbitmqexporter/internal/publisher/publisher.go @@ -0,0 +1,196 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package publisher // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/rabbitmqexporter/internal/publisher" + +import ( + "context" + "crypto/tls" + "errors" + "fmt" + "sync" + "time" + + amqp "github.com/rabbitmq/amqp091-go" + "go.uber.org/zap" +) + +type DialConfig struct { + Url string + Durable bool + Vhost string + Auth amqp.Authentication + ConnectionTimeout time.Duration + Heartbeat time.Duration + PublishConfirmationTimeout time.Duration + TLS *tls.Config + ConnectionName string +} + +type Message struct { + Exchange string + RoutingKey string + Body []byte +} + +func NewConnection(logger *zap.Logger, client AmqpClient, config DialConfig) (Publisher, error) { + p := publisher{ + logger: logger, + client: client, + config: config, + connLock: &sync.Mutex{}, + connectionErrors: make(chan *amqp.Error, 1), + } + + p.connLock.Lock() + defer p.connLock.Unlock() + err := p.connect() + + return &p, err +} + +type Publisher interface { + Publish(ctx context.Context, message Message) error + Close() error +} + +type publisher struct { + logger *zap.Logger + client AmqpClient + config DialConfig + connLock *sync.Mutex + connection Connection + connectionErrors chan *amqp.Error +} + +func (p *publisher) Publish(ctx context.Context, message Message) error { + err := p.reconnectIfUnhealthy() + if err != nil { + return err + } + + // Create a new amqp channel for publishing messages and request that the broker confirms delivery. + // This could later be optimized to re-use channels which avoids repeated network calls to create and close them. + // Concurrency-control through something like a resource pool would be necessary since aqmp channels are not thread safe. + channel, err := p.connection.Channel() + defer func(channel Channel) { + if channel != nil { + err := channel.Close() + if err != nil { + p.logger.Warn("Failed closing channel", zap.Error(err)) + } + } + }(channel) + if err != nil { + p.logger.Error("Error creating AMQP channel") + return err + } + err = channel.Confirm(false) + if err != nil { + p.logger.Error("Error enabling channel confirmation mode") + return err + } + + // Send the message + deliveryMode := amqp.Transient + if p.config.Durable { + deliveryMode = amqp.Persistent + } + + confirmation, err := channel.PublishWithDeferredConfirmWithContext(ctx, message.Exchange, message.RoutingKey, true, false, amqp.Publishing{ + Body: message.Body, + DeliveryMode: deliveryMode, + }) + + if err != nil { + err = errors.Join(errors.New("error publishing message"), err) + return err + } + + // Wait for async confirmation of the message + select { + case <-confirmation.Done(): + if confirmation.Acked() { + p.logger.Debug("Received ack") + return nil + } + p.logger.Warn("Received nack from rabbitmq publishing confirmation") + err := errors.New("received nack from rabbitmq publishing confirmation") + return err + + case <-time.After(p.config.PublishConfirmationTimeout): + p.logger.Warn("Timeout waiting for publish confirmation", zap.Duration("timeout", p.config.ConnectionTimeout)) + err := fmt.Errorf("timeout waiting for publish confirmation after %s", p.config.PublishConfirmationTimeout) + return err + } +} + +func (p *publisher) reconnectIfUnhealthy() error { + p.connLock.Lock() + defer p.connLock.Unlock() + + hasConnectionError := false + select { + case err := <-p.connectionErrors: + hasConnectionError = true + p.logger.Info("Received connection error, will retry restoring unhealthy connection", zap.Error(err)) + default: + break + } + + if hasConnectionError || !p.isConnected() { + if p.isConnected() { + err := p.connection.Close() + if err != nil { + p.logger.Warn("Error closing unhealthy connection", zap.Error(err)) + } + } + + if err := p.connect(); err != nil { + return errors.Join(errors.New("failed attempt at restoring unhealthy connection"), err) + } else { + p.logger.Info("Successfully restored unhealthy rabbitmq connection") + } + } + + return nil +} + +func (p *publisher) connect() error { + p.logger.Debug("Connecting to rabbitmq") + + properties := amqp.Table{} + properties.SetClientConnectionName(p.config.ConnectionName) + + connection, err := p.client.DialConfig(p.config.Url, amqp.Config{ + SASL: []amqp.Authentication{p.config.Auth}, + Vhost: p.config.Vhost, + Heartbeat: p.config.Heartbeat, + Dial: amqp.DefaultDial(p.config.ConnectionTimeout), + Properties: properties, + TLSClientConfig: p.config.TLS, + }) + if connection != nil { + p.connection = connection + } + if err != nil { + return err + } + + // Goal is to lazily restore the connection so this needs to be buffered to avoid blocking on asynchronous amqp errors. + // Also re-create this channel each time because apparently the amqp library can close it + p.connectionErrors = make(chan *amqp.Error, 1) + p.connection.NotifyClose(p.connectionErrors) + return nil +} + +func (p *publisher) Close() error { + if p.isConnected() { + return p.connection.Close() + } + return nil +} + +func (p *publisher) isConnected() bool { + return p.connection != nil && !p.connection.IsClosed() +} diff --git a/exporter/rabbitmqexporter/internal/publisher/publisher_test.go b/exporter/rabbitmqexporter/internal/publisher/publisher_test.go new file mode 100644 index 000000000000..cd1b1f028598 --- /dev/null +++ b/exporter/rabbitmqexporter/internal/publisher/publisher_test.go @@ -0,0 +1,388 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package publisher + +import ( + "context" + "errors" + "testing" + "time" + + amqp "github.com/rabbitmq/amqp091-go" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "go.uber.org/zap" +) + +const ( + connectUrl = "amqp://localhost" + exchange = "amq.direct" + routingKey = "some_routing_key" +) + +func TestConnectAndClose(t *testing.T) { + client := mockClient{} + connection := mockConnection{} + dialConfig := DialConfig{ + Url: connectUrl, + } + + // Start the connection successfully + client.On("DialConfig", connectUrl, mock.Anything).Return(&connection, nil) + connection.On("NotifyClose", mock.Anything).Return(make(chan *amqp.Error)) + + publisher, err := NewConnection(zap.NewNop(), &client, dialConfig) + + require.NoError(t, err) + client.AssertExpectations(t) + + // Close the connection + connection.On("IsClosed").Return(false) + connection.On("Close").Return(nil) + + err = publisher.Close() + require.NoError(t, err) + client.AssertExpectations(t) + connection.AssertExpectations(t) +} + +func TestConnectionErrorAndClose(t *testing.T) { + client := mockClient{} + dialConfig := DialConfig{ + Url: connectUrl, + } + + client.On("DialConfig", connectUrl, mock.Anything).Return(nil, errors.New("simulated connection error")) + publisher, err := NewConnection(zap.NewNop(), &client, dialConfig) + + assert.EqualError(t, err, "simulated connection error") + + err = publisher.Close() + require.NoError(t, err) + + client.AssertExpectations(t) +} + +func TestPublishAckedWithinTimeout(t *testing.T) { + client, connection, channel, confirmation := setupMocksForSuccessfulPublish() + + publisher, err := NewConnection(zap.NewNop(), client, makeDialConfig()) + require.NoError(t, err) + + err = publisher.Publish(context.Background(), makePublishMessage()) + + require.NoError(t, err) + client.AssertExpectations(t) + connection.AssertExpectations(t) + channel.AssertExpectations(t) + confirmation.AssertExpectations(t) +} + +func TestPublishNackedWithinTimeout(t *testing.T) { + client, connection, channel, confirmation := setupMocksForSuccessfulPublish() + + resetCall(confirmation.ExpectedCalls, "Acked", t) + confirmation.On("Acked").Return(false) + + publisher, err := NewConnection(zap.NewNop(), client, makeDialConfig()) + require.NoError(t, err) + + err = publisher.Publish(context.Background(), makePublishMessage()) + + assert.EqualError(t, err, "received nack from rabbitmq publishing confirmation") + client.AssertExpectations(t) + connection.AssertExpectations(t) + channel.AssertExpectations(t) + confirmation.AssertExpectations(t) +} + +func TestPublishTimeoutBeforeAck(t *testing.T) { + client, connection, channel, confirmation := setupMocksForSuccessfulPublish() + + resetCall(confirmation.ExpectedCalls, "Done", t) + resetCall(confirmation.ExpectedCalls, "Acked", t) + emptyConfirmationChan := make(<-chan struct{}) + confirmation.On("Done").Return(emptyConfirmationChan) + + publisher, err := NewConnection(zap.NewNop(), client, makeDialConfig()) + require.NoError(t, err) + + err = publisher.Publish(context.Background(), makePublishMessage()) + + assert.EqualError(t, err, "timeout waiting for publish confirmation after 20ms") + client.AssertExpectations(t) + connection.AssertExpectations(t) + channel.AssertExpectations(t) + confirmation.AssertExpectations(t) +} + +func TestPublishTwiceReusingSameConnection(t *testing.T) { + client, connection, channel, confirmation := setupMocksForSuccessfulPublish() + + // Re-use same chan to allow ACKing both publishes + confirmationChan := make(chan struct{}, 2) + confirmationChan <- struct{}{} + confirmationChan <- struct{}{} + var confirmationChanRet <-chan struct{} = confirmationChan + resetCall(confirmation.ExpectedCalls, "Done", t) + confirmation.On("Done").Return(confirmationChanRet) + + publisher, err := NewConnection(zap.NewNop(), client, makeDialConfig()) + require.NoError(t, err) + + err = publisher.Publish(context.Background(), makePublishMessage()) + require.NoError(t, err) + err = publisher.Publish(context.Background(), makePublishMessage()) + require.NoError(t, err) + + client.AssertNumberOfCalls(t, "DialConfig", 1) + client.AssertExpectations(t) + connection.AssertExpectations(t) + channel.AssertExpectations(t) + confirmation.AssertExpectations(t) +} + +func TestRestoreUnhealthyConnectionDuringPublish(t *testing.T) { + client, connection, channel, confirmation := setupMocksForSuccessfulPublish() + + // Capture the channel that the amqp library uses to notify about connection issues so that we can simulate the notification + resetCall(connection.ExpectedCalls, "NotifyClose", t) + var connectionErrChan chan *amqp.Error + connection.On("NotifyClose", mock.Anything).Return(make(chan *amqp.Error)).Run(func(args mock.Arguments) { + connectionErrChan = args.Get(0).(chan *amqp.Error) + }) + + publisher, err := NewConnection(zap.NewNop(), client, makeDialConfig()) + require.NoError(t, err) + + connectionErrChan <- amqp.ErrClosed + connection.On("Close").Return(nil) + + err = publisher.Publish(context.Background(), makePublishMessage()) + + require.NoError(t, err) + client.AssertNumberOfCalls(t, "DialConfig", 2) // Connected twice + client.AssertExpectations(t) + connection.AssertExpectations(t) + connection.AssertNumberOfCalls(t, "Close", 1) + channel.AssertExpectations(t) + confirmation.AssertExpectations(t) +} + +// Tests code path where connection is closed right after checking the connection error channel +func TestRestoreClosedConnectionDuringPublish(t *testing.T) { + client, connection, channel, confirmation := setupMocksForSuccessfulPublish() + + publisher, err := NewConnection(zap.NewNop(), client, makeDialConfig()) + require.NoError(t, err) + + resetCall(connection.ExpectedCalls, "IsClosed", t) + connection.On("IsClosed").Return(true) + + err = publisher.Publish(context.Background(), makePublishMessage()) + require.NoError(t, err) + client.AssertNumberOfCalls(t, "DialConfig", 2) // Connected twice + client.AssertExpectations(t) + connection.AssertExpectations(t) + channel.AssertExpectations(t) + confirmation.AssertExpectations(t) +} + +func TestFailRestoreConnectionDuringPublishing(t *testing.T) { + client, connection, _, _ := setupMocksForSuccessfulPublish() + + publisher, err := NewConnection(zap.NewNop(), client, makeDialConfig()) + require.NoError(t, err) + client.AssertNumberOfCalls(t, "DialConfig", 1) + + resetCall(connection.ExpectedCalls, "IsClosed", t) + connection.On("IsClosed").Return(true) + + resetCall(client.ExpectedCalls, "DialConfig", t) + client.On("DialConfig", connectUrl, mock.Anything).Return(nil, errors.New("simulated connection error")) + + err = publisher.Publish(context.Background(), makePublishMessage()) + assert.EqualError(t, err, "failed attempt at restoring unhealthy connection\nsimulated connection error") + client.AssertNumberOfCalls(t, "DialConfig", 2) // Tried reconnecting +} + +func TestErrCreatingChannel(t *testing.T) { + client, connection, _, _ := setupMocksForSuccessfulPublish() + + resetCall(connection.ExpectedCalls, "Channel", t) + connection.On("Channel").Return(nil, errors.New("simulated error creating channel")) + + publisher, err := NewConnection(zap.NewNop(), client, makeDialConfig()) + require.NoError(t, err) + + err = publisher.Publish(context.Background(), makePublishMessage()) + assert.EqualError(t, err, "simulated error creating channel") +} + +func TestErrSettingChannelConfirmMode(t *testing.T) { + client, _, channel, _ := setupMocksForSuccessfulPublish() + + resetCall(channel.ExpectedCalls, "Confirm", t) + channel.On("Confirm", false).Return(errors.New("simulated error setting channel confirm mode")) + + publisher, err := NewConnection(zap.NewNop(), client, makeDialConfig()) + require.NoError(t, err) + + err = publisher.Publish(context.Background(), makePublishMessage()) + assert.EqualError(t, err, "simulated error setting channel confirm mode") +} + +func TestErrPublishing(t *testing.T) { + client, connection, _, _ := setupMocksForSuccessfulPublish() + + // resetCall(channel.ExpectedCalls, "PublishWithDeferredConfirmWithContext") doesn't work so need to recreate the mock + channel := mockChannel{} + channel.On("Confirm", false).Return(nil) + channel.On("PublishWithDeferredConfirmWithContext", mock.Anything, exchange, routingKey, true, false, mock.MatchedBy(isPersistentDeliverMode)).Return(nil, errors.New("simulated error publishing")) + channel.On("Close").Return(nil) + resetCall(connection.ExpectedCalls, "Channel", t) + connection.On("Channel").Return(&channel, nil) + + publisher, err := NewConnection(zap.NewNop(), client, makeDialConfig()) + require.NoError(t, err) + + err = publisher.Publish(context.Background(), makePublishMessage()) + assert.EqualError(t, err, "error publishing message\nsimulated error publishing") +} + +func setupMocksForSuccessfulPublish() (*mockClient, *mockConnection, *mockChannel, *mockDeferredConfirmation) { + client := mockClient{} + connection := mockConnection{} + channel := mockChannel{} + confirmation := mockDeferredConfirmation{} + + client.On("DialConfig", mock.Anything, mock.Anything).Return(&connection, nil) + connection.On("NotifyClose", mock.Anything).Return(make(chan *amqp.Error)) + connection.On("Channel").Return(&channel, nil) + connection.On("IsClosed").Return(false) + + channel.On("Confirm", false).Return(nil) + channel.On("PublishWithDeferredConfirmWithContext", mock.Anything, exchange, routingKey, true, false, mock.MatchedBy(isPersistentDeliverMode)).Return(&confirmation, nil) + channel.On("Close").Return(nil) + + confirmationChan := make(chan struct{}, 1) + confirmationChan <- struct{}{} + var confirmationChanRet <-chan struct{} = confirmationChan + confirmation.On("Done").Return(confirmationChanRet) + confirmation.On("Acked").Return(true) + + return &client, &connection, &channel, &confirmation +} + +func isPersistentDeliverMode(p amqp.Publishing) bool { + return p.DeliveryMode == amqp.Persistent +} + +func resetCall(calls []*mock.Call, methodName string, t *testing.T) { + for _, call := range calls { + if call.Method == methodName { + call.Unset() + return + } + } + t.FailNow() +} + +type mockClient struct { + mock.Mock +} + +func (m *mockClient) DialConfig(url string, config amqp.Config) (Connection, error) { + args := m.Called(url, config) + + if connection := args.Get(0); connection != nil { + return connection.(Connection), args.Error(1) + } + return nil, args.Error(1) +} + +type mockConnection struct { + mock.Mock +} + +func (m *mockConnection) IsClosed() bool { + args := m.Called() + return args.Bool(0) +} + +func (m *mockConnection) Channel() (Channel, error) { + args := m.Called() + if channel := args.Get(0); channel != nil { + return channel.(Channel), args.Error(1) + } + return nil, args.Error(1) +} + +func (m *mockConnection) NotifyClose(receiver chan *amqp.Error) chan *amqp.Error { + args := m.Called(receiver) + return args.Get(0).(chan *amqp.Error) +} + +func (m *mockConnection) Close() error { + args := m.Called() + return args.Error(0) +} + +type mockChannel struct { + mock.Mock +} + +func (m *mockChannel) Confirm(noWait bool) error { + args := m.Called(noWait) + return args.Error(0) +} + +func (m *mockChannel) PublishWithDeferredConfirmWithContext(ctx context.Context, exchange, key string, mandatory, immediate bool, msg amqp.Publishing) (DeferredConfirmation, error) { + args := m.Called(ctx, exchange, key, mandatory, immediate, msg) + if confirmation := args.Get(0); confirmation != nil { + return confirmation.(DeferredConfirmation), args.Error(1) + } + return nil, args.Error(1) +} + +func (m *mockChannel) IsClosed() bool { + args := m.Called() + return args.Bool(0) +} + +func (m *mockChannel) Close() error { + args := m.Called() + return args.Error(0) +} + +type mockDeferredConfirmation struct { + mock.Mock +} + +func (m *mockDeferredConfirmation) Done() <-chan struct{} { + args := m.Called() + return args.Get(0).(<-chan struct{}) +} + +func (m *mockDeferredConfirmation) Acked() bool { + args := m.Called() + return args.Bool(0) +} + +func makePublishMessage() Message { + return Message{ + Exchange: exchange, + RoutingKey: routingKey, + Body: make([]byte, 1), + } +} + +func makeDialConfig() DialConfig { + return DialConfig{ + Url: connectUrl, + PublishConfirmationTimeout: time.Millisecond * 20, + Durable: true, + } +} diff --git a/exporter/rabbitmqexporter/marshaler.go b/exporter/rabbitmqexporter/marshaler.go new file mode 100644 index 000000000000..e477611a4e48 --- /dev/null +++ b/exporter/rabbitmqexporter/marshaler.go @@ -0,0 +1,45 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package rabbitmqexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/rabbitmqexporter" + +import ( + "fmt" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/ptrace" +) + +type marshaler struct { + logsMarshaler plog.Marshaler + tracesMarshaler ptrace.Marshaler + metricsMarshaler pmetric.Marshaler +} + +func newMarshaler(encoding *component.ID, host component.Host) (*marshaler, error) { + var ( + logsMarshaler plog.Marshaler = &plog.ProtoMarshaler{} + tracesMarshaler ptrace.Marshaler = &ptrace.ProtoMarshaler{} + metricsMarshaler pmetric.Marshaler = &pmetric.ProtoMarshaler{} + ) + + if encoding != nil { + ext, ok := host.GetExtensions()[*encoding] + if !ok { + return nil, fmt.Errorf("unknown encoding %q", encoding) + } + + logsMarshaler, _ = ext.(plog.Marshaler) + tracesMarshaler, _ = ext.(ptrace.Marshaler) + metricsMarshaler, _ = ext.(pmetric.Marshaler) + } + + m := marshaler{ + logsMarshaler: logsMarshaler, + tracesMarshaler: tracesMarshaler, + metricsMarshaler: metricsMarshaler, + } + return &m, nil +} diff --git a/exporter/rabbitmqexporter/marshaler_test.go b/exporter/rabbitmqexporter/marshaler_test.go new file mode 100644 index 000000000000..2fd002602b21 --- /dev/null +++ b/exporter/rabbitmqexporter/marshaler_test.go @@ -0,0 +1,73 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package rabbitmqexporter + +import ( + "context" + "testing" + + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/ptrace" +) + +func TestMarshalUsingEncodingExtension(t *testing.T) { + host := mockHostWithEncodings{} + extension := mockEncodingExtension{} + extensionMap := make(map[component.ID]component.Component) + extensionMap[encodingComponentId] = &extension + host.On("GetExtensions").Return(extensionMap) + + m, err := newMarshaler(&encodingComponentId, &host) + + require.NotNil(t, m) + require.NoError(t, err) + require.Equal(t, m.logsMarshaler, &extension) + require.Equal(t, m.metricsMarshaler, &extension) + require.Equal(t, m.tracesMarshaler, &extension) +} + +type mockHostWithEncodings struct { + mock.Mock +} + +type mockEncodingExtension struct { + mock.Mock +} + +func (h *mockHostWithEncodings) GetFactory(component.Kind, component.Type) component.Factory { + return nil +} + +func (h *mockHostWithEncodings) GetExporters() map[component.DataType]map[component.ID]component.Component { + return nil +} + +func (h *mockHostWithEncodings) GetExtensions() map[component.ID]component.Component { + args := h.Called() + return args.Get(0).(map[component.ID]component.Component) +} + +func (m *mockEncodingExtension) MarshalLogs(plog.Logs) ([]byte, error) { + return nil, nil +} + +func (m *mockEncodingExtension) MarshalTraces(ptrace.Traces) ([]byte, error) { + return nil, nil +} + +func (m *mockEncodingExtension) MarshalMetrics(pmetric.Metrics) ([]byte, error) { + return nil, nil +} + +func (m mockEncodingExtension) Start(context.Context, component.Host) error { + return nil +} + +func (m mockEncodingExtension) Shutdown(context.Context) error { + return nil +} diff --git a/exporter/rabbitmqexporter/rabbitmq_exporter.go b/exporter/rabbitmqexporter/rabbitmq_exporter.go index 0c4d326a1e47..39f9c0a12940 100644 --- a/exporter/rabbitmqexporter/rabbitmq_exporter.go +++ b/exporter/rabbitmqexporter/rabbitmq_exporter.go @@ -5,7 +5,10 @@ package rabbitmqexporter // import "github.com/open-telemetry/opentelemetry-coll import ( "context" + "crypto/tls" + "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/rabbitmqexporter/internal/publisher" + amqp "github.com/rabbitmq/amqp091-go" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" @@ -13,42 +16,114 @@ import ( ) type rabbitmqExporter struct { - config *Config - settings component.TelemetrySettings + config *Config + tlsFactory + settings component.TelemetrySettings + routingKey string + connectionName string + *marshaler + publisherFactory + publisher publisher.Publisher } -func newRabbitmqExporter(cfg *Config, set component.TelemetrySettings) *rabbitmqExporter { - return &rabbitmqExporter{ - config: cfg, - settings: set, +type publisherFactory = func(publisher.DialConfig) (publisher.Publisher, error) +type tlsFactory = func() (*tls.Config, error) + +func newRabbitmqExporter(cfg *Config, set component.TelemetrySettings, publisherFactory publisherFactory, tlsFactory tlsFactory, routingKey string, connectionName string) *rabbitmqExporter { + exporter := &rabbitmqExporter{ + config: cfg, + settings: set, + routingKey: routingKey, + connectionName: connectionName, + publisherFactory: publisherFactory, + tlsFactory: tlsFactory, } + return exporter } -func (s *rabbitmqExporter) start(_ context.Context, _ component.Host) error { +func (e *rabbitmqExporter) start(_ context.Context, host component.Host) error { + m, err := newMarshaler(e.config.EncodingExtensionId, host) + if err != nil { + return err + } + e.marshaler = m + + dialConfig := publisher.DialConfig{ + Url: e.config.Connection.Endpoint, + Vhost: e.config.Connection.VHost, + Auth: &amqp.PlainAuth{ + Username: e.config.Connection.Auth.Plain.Username, + Password: e.config.Connection.Auth.Plain.Password, + }, + Durable: e.config.Durable, + ConnectionName: e.connectionName, + ConnectionTimeout: e.config.Connection.ConnectionTimeout, + Heartbeat: e.config.Connection.Heartbeat, + PublishConfirmationTimeout: e.config.Connection.PublishConfirmationTimeout, + } + + tlsConfig, err := e.tlsFactory() + if err != nil { + return err + } + dialConfig.TLS = tlsConfig + + e.settings.Logger.Info("Establishing initial connection to RabbitMQ") + p, err := e.publisherFactory(dialConfig) + e.publisher = p + + if err != nil { + return err + } - // To Be Implemented return nil } -func (s *rabbitmqExporter) pushTraces(_ context.Context, _ ptrace.Traces) error { +func (e *rabbitmqExporter) publishTraces(context context.Context, traces ptrace.Traces) error { + body, err := e.tracesMarshaler.MarshalTraces(traces) + if err != nil { + return err + } - // To Be Implemented - return nil + message := publisher.Message{ + Exchange: e.config.Routing.Exchange, + RoutingKey: e.routingKey, + Body: body, + } + return e.publisher.Publish(context, message) } -func (s *rabbitmqExporter) pushMetrics(_ context.Context, _ pmetric.Metrics) error { +func (e *rabbitmqExporter) publishMetrics(context context.Context, metrics pmetric.Metrics) error { + body, err := e.metricsMarshaler.MarshalMetrics(metrics) + if err != nil { + return err + } - // To Be Implemented - return nil + message := publisher.Message{ + Exchange: e.config.Routing.Exchange, + RoutingKey: e.routingKey, + Body: body, + } + return e.publisher.Publish(context, message) } -func (s *rabbitmqExporter) pushLogs(_ context.Context, _ plog.Logs) error { +func (e *rabbitmqExporter) publishLogs(context context.Context, logs plog.Logs) error { + body, err := e.logsMarshaler.MarshalLogs(logs) + if err != nil { + return err + } - // To Be Implemented - return nil + message := publisher.Message{ + Exchange: e.config.Routing.Exchange, + RoutingKey: e.routingKey, + Body: body, + } + return e.publisher.Publish(context, message) } -func (s *rabbitmqExporter) shutdown(_ context.Context) error { - // To Be Implemented +func (e *rabbitmqExporter) shutdown(_ context.Context) error { + if e.publisher != nil { + return e.publisher.Close() + } return nil } diff --git a/exporter/rabbitmqexporter/rabbitmq_exporter_test.go b/exporter/rabbitmqexporter/rabbitmq_exporter_test.go new file mode 100644 index 000000000000..f6392da4d594 --- /dev/null +++ b/exporter/rabbitmqexporter/rabbitmq_exporter_test.go @@ -0,0 +1,171 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package rabbitmqexporter + +import ( + "context" + "crypto/tls" + "errors" + "testing" + + "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/rabbitmqexporter/internal/publisher" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/testdata" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/exporter/exportertest" +) + +const ( + routingKey = "routing_key" + connectionName = "connection_name" +) + +func TestStartAndShutdown(t *testing.T) { + factory := NewFactory() + cfg := factory.CreateDefaultConfig().(*Config) + pub := mockPublisher{} + var pubFactory = func(config publisher.DialConfig) (publisher.Publisher, error) { + return &pub, nil + } + exporter := newRabbitmqExporter(cfg, exportertest.NewNopCreateSettings().TelemetrySettings, pubFactory, newTLSFactory(cfg), routingKey, connectionName) + + err := exporter.start(context.Background(), componenttest.NewNopHost()) + require.NoError(t, err) + + pub.On("Close").Return(nil) + err = exporter.shutdown(context.Background()) + require.NoError(t, err) + + pub.AssertExpectations(t) +} + +func TestStart_UnknownMarshallerEncoding(t *testing.T) { + factory := NewFactory() + cfg := factory.CreateDefaultConfig().(*Config) + pub := mockPublisher{} + var pubFactory = func(config publisher.DialConfig) (publisher.Publisher, error) { + return &pub, nil + } + + unknownExtensionId := component.NewID(component.MustNewType("invalid_encoding")) + cfg.EncodingExtensionId = &unknownExtensionId + host := mockHost{} + exporter := newRabbitmqExporter(cfg, exportertest.NewNopCreateSettings().TelemetrySettings, pubFactory, newTLSFactory(cfg), routingKey, connectionName) + + err := exporter.start(context.Background(), host) + assert.EqualError(t, err, "unknown encoding \"invalid_encoding\"") + + err = exporter.shutdown(context.Background()) + require.NoError(t, err) +} + +func TestStart_PublisherCreationErr(t *testing.T) { + factory := NewFactory() + cfg := factory.CreateDefaultConfig().(*Config) + var pubFactory = func(config publisher.DialConfig) (publisher.Publisher, error) { + return nil, errors.New("simulating error creating publisher") + } + exporter := newRabbitmqExporter(cfg, exportertest.NewNopCreateSettings().TelemetrySettings, pubFactory, newTLSFactory(cfg), routingKey, connectionName) + + err := exporter.start(context.Background(), componenttest.NewNopHost()) + assert.EqualError(t, err, "simulating error creating publisher") + + err = exporter.shutdown(context.Background()) + require.NoError(t, err) +} + +func TestStart_TLSError(t *testing.T) { + factory := NewFactory() + cfg := factory.CreateDefaultConfig().(*Config) + pub := mockPublisher{} + var pubFactory = func(config publisher.DialConfig) (publisher.Publisher, error) { + return &pub, nil + } + tlsFactory := func() (*tls.Config, error) { + return nil, errors.New("simulating tls config error") + } + exporter := newRabbitmqExporter(cfg, exportertest.NewNopCreateSettings().TelemetrySettings, pubFactory, tlsFactory, routingKey, connectionName) + + err := exporter.start(context.Background(), componenttest.NewNopHost()) + assert.EqualError(t, err, "simulating tls config error") + + err = exporter.shutdown(context.Background()) + require.NoError(t, err) +} + +func TestPublishMetrics(t *testing.T) { + pub, exporter := exporterForPublishing(t) + + pub.On("Publish", mock.Anything, mock.MatchedBy(func(message publisher.Message) bool { + return message.RoutingKey == routingKey && len(message.Body) > 0 && message.Exchange == "" + })).Return(nil) + err := exporter.publishMetrics(context.Background(), testdata.GenerateMetricsOneMetric()) + + require.NoError(t, err) + pub.AssertExpectations(t) +} + +func TestPublishTraces(t *testing.T) { + pub, exporter := exporterForPublishing(t) + + pub.On("Publish", mock.Anything, mock.MatchedBy(func(message publisher.Message) bool { + return message.RoutingKey == routingKey && len(message.Body) > 0 && message.Exchange == "" + })).Return(nil) + err := exporter.publishTraces(context.Background(), testdata.GenerateTracesOneSpan()) + + require.NoError(t, err) + pub.AssertExpectations(t) +} + +func TestPublishLogs(t *testing.T) { + pub, exporter := exporterForPublishing(t) + + pub.On("Publish", mock.Anything, mock.MatchedBy(func(message publisher.Message) bool { + return message.RoutingKey == routingKey && len(message.Body) > 0 && message.Exchange == "" + })).Return(nil) + err := exporter.publishLogs(context.Background(), testdata.GenerateLogsOneLogRecord()) + + require.NoError(t, err) + pub.AssertExpectations(t) +} + +func exporterForPublishing(t *testing.T) (*mockPublisher, *rabbitmqExporter) { + factory := NewFactory() + cfg := factory.CreateDefaultConfig().(*Config) + pub := mockPublisher{} + var pubFactory = func(config publisher.DialConfig) (publisher.Publisher, error) { + return &pub, nil + } + exporter := newRabbitmqExporter(cfg, exportertest.NewNopCreateSettings().TelemetrySettings, pubFactory, newTLSFactory(cfg), routingKey, connectionName) + + err := exporter.start(context.Background(), componenttest.NewNopHost()) + require.NoError(t, err) + + return &pub, exporter +} + +type mockPublisher struct { + mock.Mock +} + +func (c *mockPublisher) Publish(ctx context.Context, message publisher.Message) error { + args := c.Called(ctx, message) + return args.Error(0) +} + +func (c *mockPublisher) Close() error { + args := c.Called() + return args.Error(0) +} + +type mockHost struct { + component.Host +} + +func (h mockHost) GetExtensions() map[component.ID]component.Component { + return make(map[component.ID]component.Component) +} diff --git a/exporter/rabbitmqexporter/testdata/test-config.yaml b/exporter/rabbitmqexporter/testdata/test-config.yaml index 18ea1c1f6d0a..eaf35e38cf3c 100644 --- a/exporter/rabbitmqexporter/testdata/test-config.yaml +++ b/exporter/rabbitmqexporter/testdata/test-config.yaml @@ -1,14 +1,22 @@ rabbitmq/all_fields: connection: - endpoint: amqp://localhost:5672 + endpoint: amqps://localhost:5672 vhost: vhost1 auth: - sasl: + plain: username: user password: pass + tls: + ca_file: "cert123" + insecure: true + connection_timeout: 1ms + heartbeat: 2ms + publish_confirmation_timeout: 3ms + routing: + exchange: amq.direct routing_key: custom_routing_key - message_body_encoding: otlp_json + encoding_extension: otlp_encoding/rabbitmq123 durable: false retry_on_failure: enabled: true @@ -17,6 +25,20 @@ rabbitmq/mandatory_fields: connection: endpoint: amqp://localhost:5672 auth: - sasl: + plain: + username: user + password: pass + +rabbitmq/missing_endpoint: + connection: + auth: + plain: username: user password: pass + +rabbitmq/missing_plainauth_username: + connection: + endpoint: amqp://localhost:5672 + auth: + plain: + password: pass \ No newline at end of file From d4775b12f08adb46da85cb3e5b5fff06e2803d28 Mon Sep 17 00:00:00 2001 From: swar8080 Date: Sun, 31 Mar 2024 20:15:25 -0400 Subject: [PATCH 2/6] fix linting issues --- exporter/rabbitmqexporter/config.go | 2 +- exporter/rabbitmqexporter/config_test.go | 6 +++--- exporter/rabbitmqexporter/factory.go | 2 +- exporter/rabbitmqexporter/go.mod | 10 ++++++++-- exporter/rabbitmqexporter/go.sum | 8 ++++---- .../internal/publisher/publisher.go | 13 ++++++------- .../internal/publisher/publisher_test.go | 14 +++++++------- exporter/rabbitmqexporter/marshaler_test.go | 8 ++++---- .../rabbitmqexporter/rabbitmq_exporter.go | 7 ++++--- .../rabbitmq_exporter_test.go | 19 ++++++++++--------- 10 files changed, 48 insertions(+), 41 deletions(-) diff --git a/exporter/rabbitmqexporter/config.go b/exporter/rabbitmqexporter/config.go index 34bf766ad4c1..f3bd7cdee175 100644 --- a/exporter/rabbitmqexporter/config.go +++ b/exporter/rabbitmqexporter/config.go @@ -15,7 +15,7 @@ import ( type Config struct { Connection ConnectionConfig `mapstructure:"connection"` Routing RoutingConfig `mapstructure:"routing"` - EncodingExtensionId *component.ID `mapstructure:"encoding_extension"` + EncodingExtensionID *component.ID `mapstructure:"encoding_extension"` Durable bool `mapstructure:"durable"` RetrySettings configretry.BackOffConfig `mapstructure:"retry_on_failure"` } diff --git a/exporter/rabbitmqexporter/config_test.go b/exporter/rabbitmqexporter/config_test.go index 41b04db7c342..1c82b113f1bd 100644 --- a/exporter/rabbitmqexporter/config_test.go +++ b/exporter/rabbitmqexporter/config_test.go @@ -19,7 +19,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/rabbitmqexporter/internal/metadata" ) -var encodingComponentId = component.NewIDWithName(component.MustNewType("otlp_encoding"), "rabbitmq123") +var encodingComponentID = component.NewIDWithName(component.MustNewType("otlp_encoding"), "rabbitmq123") func TestLoadConfig(t *testing.T) { t.Parallel() @@ -53,7 +53,7 @@ func TestLoadConfig(t *testing.T) { }, }, TLSConfig: &configtls.ClientConfig{ - TLSSetting: configtls.Config{ + Config: configtls.Config{ CAFile: "cert123", }, Insecure: true, @@ -66,7 +66,7 @@ func TestLoadConfig(t *testing.T) { Exchange: "amq.direct", RoutingKey: "custom_routing_key", }, - EncodingExtensionId: &encodingComponentId, + EncodingExtensionID: &encodingComponentID, Durable: false, RetrySettings: configretry.BackOffConfig{ Enabled: true, diff --git a/exporter/rabbitmqexporter/factory.go b/exporter/rabbitmqexporter/factory.go index 19267e1ba119..6004ad0f2495 100644 --- a/exporter/rabbitmqexporter/factory.go +++ b/exporter/rabbitmqexporter/factory.go @@ -8,7 +8,6 @@ import ( "crypto/tls" "time" - "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/rabbitmqexporter/internal/publisher" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configretry" "go.opentelemetry.io/collector/consumer" @@ -16,6 +15,7 @@ import ( "go.opentelemetry.io/collector/exporter/exporterhelper" "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/rabbitmqexporter/internal/metadata" + "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/rabbitmqexporter/internal/publisher" ) const ( diff --git a/exporter/rabbitmqexporter/go.mod b/exporter/rabbitmqexporter/go.mod index bbd567655b01..7c61b00214e5 100644 --- a/exporter/rabbitmqexporter/go.mod +++ b/exporter/rabbitmqexporter/go.mod @@ -8,7 +8,7 @@ require ( github.com/stretchr/testify v1.9.0 go.opentelemetry.io/collector/component v0.97.1-0.20240327181407-1038b67c85a0 go.opentelemetry.io/collector/config/configretry v0.97.1-0.20240327181407-1038b67c85a0 - go.opentelemetry.io/collector/config/configtls v0.97.0 + go.opentelemetry.io/collector/config/configtls v0.97.1-0.20240327181407-1038b67c85a0 go.opentelemetry.io/collector/confmap v0.97.1-0.20240327181407-1038b67c85a0 go.opentelemetry.io/collector/consumer v0.97.1-0.20240327181407-1038b67c85a0 go.opentelemetry.io/collector/exporter v0.97.1-0.20240327181407-1038b67c85a0 @@ -46,7 +46,7 @@ require ( github.com/prometheus/procfs v0.12.0 // indirect github.com/stretchr/objx v0.5.2 // indirect go.opentelemetry.io/collector v0.97.1-0.20240327181407-1038b67c85a0 // indirect - go.opentelemetry.io/collector/config/configopaque v1.4.0 // indirect + go.opentelemetry.io/collector/config/configopaque v1.4.1-0.20240327181407-1038b67c85a0 // indirect go.opentelemetry.io/collector/config/configtelemetry v0.97.1-0.20240327181407-1038b67c85a0 // indirect go.opentelemetry.io/collector/extension v0.97.1-0.20240327181407-1038b67c85a0 // indirect go.opentelemetry.io/collector/receiver v0.97.1-0.20240327181407-1038b67c85a0 // indirect @@ -65,3 +65,9 @@ require ( ) replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal => ../../internal/coreinternal + +replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil => ../../pkg/pdatautil + +replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest => ../../pkg/pdatatest + +replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden => ../../pkg/golden diff --git a/exporter/rabbitmqexporter/go.sum b/exporter/rabbitmqexporter/go.sum index d9d989d520af..b88ee5f371e6 100644 --- a/exporter/rabbitmqexporter/go.sum +++ b/exporter/rabbitmqexporter/go.sum @@ -82,14 +82,14 @@ go.opentelemetry.io/collector v0.97.1-0.20240327181407-1038b67c85a0 h1:dvxsnQ+nB go.opentelemetry.io/collector v0.97.1-0.20240327181407-1038b67c85a0/go.mod h1:V6xquYAaO2VHVu4DBK28JYuikRdZajh7DH5Vl/Y8NiA= go.opentelemetry.io/collector/component v0.97.1-0.20240327181407-1038b67c85a0 h1:OBXZrNlbQtCfpcqfVmKfsiqEKket/cHm61e4l2hfxuo= go.opentelemetry.io/collector/component v0.97.1-0.20240327181407-1038b67c85a0/go.mod h1:F/m3HMlkb16RKI7wJjgbECK1IZkAcmB8bu7yD8XOkwM= -go.opentelemetry.io/collector/config/configopaque v1.4.0 h1:5KgD9oLN+N07HqDsLzUrU0mE2pC8cMhrCSC1Nf8CEO4= -go.opentelemetry.io/collector/config/configopaque v1.4.0/go.mod h1:7Qzo69x7i+FaNELeA9jmZtVvfnR5lE6JYa5YEOCJPFQ= +go.opentelemetry.io/collector/config/configopaque v1.4.1-0.20240327181407-1038b67c85a0 h1:nZkYYEdASkZ5L43eclvdWOnM8cB4Ga4FxPv+qLMaLHQ= +go.opentelemetry.io/collector/config/configopaque v1.4.1-0.20240327181407-1038b67c85a0/go.mod h1:7Qzo69x7i+FaNELeA9jmZtVvfnR5lE6JYa5YEOCJPFQ= go.opentelemetry.io/collector/config/configretry v0.97.1-0.20240327181407-1038b67c85a0 h1:kkApwmm9g5yXaKwVR+9gWTv/nOdziAHX3tcVrAv+1Bc= go.opentelemetry.io/collector/config/configretry v0.97.1-0.20240327181407-1038b67c85a0/go.mod h1:s7A6ZGxK8bxqidFzwbr2pITzbsB2qf+aeHEDQDcanV8= go.opentelemetry.io/collector/config/configtelemetry v0.97.1-0.20240327181407-1038b67c85a0 h1:n6gNCKxrCs3hD+jafL93JdtPVl05p+C5PecoNE7YUrw= go.opentelemetry.io/collector/config/configtelemetry v0.97.1-0.20240327181407-1038b67c85a0/go.mod h1:YV5PaOdtnU1xRomPcYqoHmyCr48tnaAREeGO96EZw8o= -go.opentelemetry.io/collector/config/configtls v0.97.0 h1:wmXj/rKQUGMZzbHVCTyB+xUWImsGxnLqhivwjBE0FdI= -go.opentelemetry.io/collector/config/configtls v0.97.0/go.mod h1:ev/fMI6hm1WTSHHEAEoVjF3RZj0qf38E/XO5itFku7k= +go.opentelemetry.io/collector/config/configtls v0.97.1-0.20240327181407-1038b67c85a0 h1:xc0wUkz0fFnoUdqtGyyoKMwDO1saOaGj60aunfk8lf0= +go.opentelemetry.io/collector/config/configtls v0.97.1-0.20240327181407-1038b67c85a0/go.mod h1:ev/fMI6hm1WTSHHEAEoVjF3RZj0qf38E/XO5itFku7k= go.opentelemetry.io/collector/confmap v0.97.1-0.20240327181407-1038b67c85a0 h1:Cm5WDKNnmKLZmiAzodv3LLodAN3fAZFl+Q6jek/K6xU= go.opentelemetry.io/collector/confmap v0.97.1-0.20240327181407-1038b67c85a0/go.mod h1:AnJmZcZoOLuykSXGiAf3shi11ZZk5ei4tZd9dDTTpWE= go.opentelemetry.io/collector/consumer v0.97.1-0.20240327181407-1038b67c85a0 h1:13pZ9wIF0ogiuAk+KVV8ekEGyYXqS44PV9tIKbAIzLc= diff --git a/exporter/rabbitmqexporter/internal/publisher/publisher.go b/exporter/rabbitmqexporter/internal/publisher/publisher.go index c9d3cfe1382c..c4a6b994ba74 100644 --- a/exporter/rabbitmqexporter/internal/publisher/publisher.go +++ b/exporter/rabbitmqexporter/internal/publisher/publisher.go @@ -16,7 +16,7 @@ import ( ) type DialConfig struct { - Url string + URL string Durable bool Vhost string Auth amqp.Authentication @@ -75,9 +75,9 @@ func (p *publisher) Publish(ctx context.Context, message Message) error { channel, err := p.connection.Channel() defer func(channel Channel) { if channel != nil { - err := channel.Close() - if err != nil { - p.logger.Warn("Failed closing channel", zap.Error(err)) + err2 := channel.Close() + if err2 != nil { + p.logger.Warn("Failed closing channel", zap.Error(err2)) } } }(channel) @@ -148,9 +148,8 @@ func (p *publisher) reconnectIfUnhealthy() error { if err := p.connect(); err != nil { return errors.Join(errors.New("failed attempt at restoring unhealthy connection"), err) - } else { - p.logger.Info("Successfully restored unhealthy rabbitmq connection") } + p.logger.Info("Successfully restored unhealthy rabbitmq connection") } return nil @@ -162,7 +161,7 @@ func (p *publisher) connect() error { properties := amqp.Table{} properties.SetClientConnectionName(p.config.ConnectionName) - connection, err := p.client.DialConfig(p.config.Url, amqp.Config{ + connection, err := p.client.DialConfig(p.config.URL, amqp.Config{ SASL: []amqp.Authentication{p.config.Auth}, Vhost: p.config.Vhost, Heartbeat: p.config.Heartbeat, diff --git a/exporter/rabbitmqexporter/internal/publisher/publisher_test.go b/exporter/rabbitmqexporter/internal/publisher/publisher_test.go index cd1b1f028598..5d8b63693e48 100644 --- a/exporter/rabbitmqexporter/internal/publisher/publisher_test.go +++ b/exporter/rabbitmqexporter/internal/publisher/publisher_test.go @@ -17,7 +17,7 @@ import ( ) const ( - connectUrl = "amqp://localhost" + connectURL = "amqp://localhost" exchange = "amq.direct" routingKey = "some_routing_key" ) @@ -26,11 +26,11 @@ func TestConnectAndClose(t *testing.T) { client := mockClient{} connection := mockConnection{} dialConfig := DialConfig{ - Url: connectUrl, + URL: connectURL, } // Start the connection successfully - client.On("DialConfig", connectUrl, mock.Anything).Return(&connection, nil) + client.On("DialConfig", connectURL, mock.Anything).Return(&connection, nil) connection.On("NotifyClose", mock.Anything).Return(make(chan *amqp.Error)) publisher, err := NewConnection(zap.NewNop(), &client, dialConfig) @@ -51,10 +51,10 @@ func TestConnectAndClose(t *testing.T) { func TestConnectionErrorAndClose(t *testing.T) { client := mockClient{} dialConfig := DialConfig{ - Url: connectUrl, + URL: connectURL, } - client.On("DialConfig", connectUrl, mock.Anything).Return(nil, errors.New("simulated connection error")) + client.On("DialConfig", connectURL, mock.Anything).Return(nil, errors.New("simulated connection error")) publisher, err := NewConnection(zap.NewNop(), &client, dialConfig) assert.EqualError(t, err, "simulated connection error") @@ -201,7 +201,7 @@ func TestFailRestoreConnectionDuringPublishing(t *testing.T) { connection.On("IsClosed").Return(true) resetCall(client.ExpectedCalls, "DialConfig", t) - client.On("DialConfig", connectUrl, mock.Anything).Return(nil, errors.New("simulated connection error")) + client.On("DialConfig", connectURL, mock.Anything).Return(nil, errors.New("simulated connection error")) err = publisher.Publish(context.Background(), makePublishMessage()) assert.EqualError(t, err, "failed attempt at restoring unhealthy connection\nsimulated connection error") @@ -381,7 +381,7 @@ func makePublishMessage() Message { func makeDialConfig() DialConfig { return DialConfig{ - Url: connectUrl, + URL: connectURL, PublishConfirmationTimeout: time.Millisecond * 20, Durable: true, } diff --git a/exporter/rabbitmqexporter/marshaler_test.go b/exporter/rabbitmqexporter/marshaler_test.go index 2fd002602b21..b28c24d8c701 100644 --- a/exporter/rabbitmqexporter/marshaler_test.go +++ b/exporter/rabbitmqexporter/marshaler_test.go @@ -19,10 +19,10 @@ func TestMarshalUsingEncodingExtension(t *testing.T) { host := mockHostWithEncodings{} extension := mockEncodingExtension{} extensionMap := make(map[component.ID]component.Component) - extensionMap[encodingComponentId] = &extension + extensionMap[encodingComponentID] = &extension host.On("GetExtensions").Return(extensionMap) - m, err := newMarshaler(&encodingComponentId, &host) + m, err := newMarshaler(&encodingComponentID, &host) require.NotNil(t, m) require.NoError(t, err) @@ -64,10 +64,10 @@ func (m *mockEncodingExtension) MarshalMetrics(pmetric.Metrics) ([]byte, error) return nil, nil } -func (m mockEncodingExtension) Start(context.Context, component.Host) error { +func (m *mockEncodingExtension) Start(context.Context, component.Host) error { return nil } -func (m mockEncodingExtension) Shutdown(context.Context) error { +func (m *mockEncodingExtension) Shutdown(context.Context) error { return nil } diff --git a/exporter/rabbitmqexporter/rabbitmq_exporter.go b/exporter/rabbitmqexporter/rabbitmq_exporter.go index 39f9c0a12940..c6f01297cdda 100644 --- a/exporter/rabbitmqexporter/rabbitmq_exporter.go +++ b/exporter/rabbitmqexporter/rabbitmq_exporter.go @@ -7,12 +7,13 @@ import ( "context" "crypto/tls" - "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/rabbitmqexporter/internal/publisher" amqp "github.com/rabbitmq/amqp091-go" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" + + "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/rabbitmqexporter/internal/publisher" ) type rabbitmqExporter struct { @@ -42,14 +43,14 @@ func newRabbitmqExporter(cfg *Config, set component.TelemetrySettings, publisher } func (e *rabbitmqExporter) start(_ context.Context, host component.Host) error { - m, err := newMarshaler(e.config.EncodingExtensionId, host) + m, err := newMarshaler(e.config.EncodingExtensionID, host) if err != nil { return err } e.marshaler = m dialConfig := publisher.DialConfig{ - Url: e.config.Connection.Endpoint, + URL: e.config.Connection.Endpoint, Vhost: e.config.Connection.VHost, Auth: &amqp.PlainAuth{ Username: e.config.Connection.Auth.Plain.Username, diff --git a/exporter/rabbitmqexporter/rabbitmq_exporter_test.go b/exporter/rabbitmqexporter/rabbitmq_exporter_test.go index f6392da4d594..3f0f401d5154 100644 --- a/exporter/rabbitmqexporter/rabbitmq_exporter_test.go +++ b/exporter/rabbitmqexporter/rabbitmq_exporter_test.go @@ -9,14 +9,15 @@ import ( "errors" "testing" - "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/rabbitmqexporter/internal/publisher" - "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/testdata" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/exporter/exportertest" + + "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/rabbitmqexporter/internal/publisher" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/testdata" ) const ( @@ -28,7 +29,7 @@ func TestStartAndShutdown(t *testing.T) { factory := NewFactory() cfg := factory.CreateDefaultConfig().(*Config) pub := mockPublisher{} - var pubFactory = func(config publisher.DialConfig) (publisher.Publisher, error) { + var pubFactory = func(publisher.DialConfig) (publisher.Publisher, error) { return &pub, nil } exporter := newRabbitmqExporter(cfg, exportertest.NewNopCreateSettings().TelemetrySettings, pubFactory, newTLSFactory(cfg), routingKey, connectionName) @@ -47,12 +48,12 @@ func TestStart_UnknownMarshallerEncoding(t *testing.T) { factory := NewFactory() cfg := factory.CreateDefaultConfig().(*Config) pub := mockPublisher{} - var pubFactory = func(config publisher.DialConfig) (publisher.Publisher, error) { + var pubFactory = func(publisher.DialConfig) (publisher.Publisher, error) { return &pub, nil } - unknownExtensionId := component.NewID(component.MustNewType("invalid_encoding")) - cfg.EncodingExtensionId = &unknownExtensionId + unknownExtensionID := component.NewID(component.MustNewType("invalid_encoding")) + cfg.EncodingExtensionID = &unknownExtensionID host := mockHost{} exporter := newRabbitmqExporter(cfg, exportertest.NewNopCreateSettings().TelemetrySettings, pubFactory, newTLSFactory(cfg), routingKey, connectionName) @@ -66,7 +67,7 @@ func TestStart_UnknownMarshallerEncoding(t *testing.T) { func TestStart_PublisherCreationErr(t *testing.T) { factory := NewFactory() cfg := factory.CreateDefaultConfig().(*Config) - var pubFactory = func(config publisher.DialConfig) (publisher.Publisher, error) { + var pubFactory = func(publisher.DialConfig) (publisher.Publisher, error) { return nil, errors.New("simulating error creating publisher") } exporter := newRabbitmqExporter(cfg, exportertest.NewNopCreateSettings().TelemetrySettings, pubFactory, newTLSFactory(cfg), routingKey, connectionName) @@ -82,7 +83,7 @@ func TestStart_TLSError(t *testing.T) { factory := NewFactory() cfg := factory.CreateDefaultConfig().(*Config) pub := mockPublisher{} - var pubFactory = func(config publisher.DialConfig) (publisher.Publisher, error) { + var pubFactory = func(publisher.DialConfig) (publisher.Publisher, error) { return &pub, nil } tlsFactory := func() (*tls.Config, error) { @@ -137,7 +138,7 @@ func exporterForPublishing(t *testing.T) (*mockPublisher, *rabbitmqExporter) { factory := NewFactory() cfg := factory.CreateDefaultConfig().(*Config) pub := mockPublisher{} - var pubFactory = func(config publisher.DialConfig) (publisher.Publisher, error) { + var pubFactory = func(publisher.DialConfig) (publisher.Publisher, error) { return &pub, nil } exporter := newRabbitmqExporter(cfg, exportertest.NewNopCreateSettings().TelemetrySettings, pubFactory, newTLSFactory(cfg), routingKey, connectionName) From d90caf607f8b8295ded0048e30008cfb6bbb6a7c Mon Sep 17 00:00:00 2001 From: swar8080 Date: Mon, 1 Apr 2024 21:43:56 -0400 Subject: [PATCH 3/6] Add some comments to the README --- exporter/rabbitmqexporter/README.md | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/exporter/rabbitmqexporter/README.md b/exporter/rabbitmqexporter/README.md index 2ea62fd86f1c..d99c6ad85066 100644 --- a/exporter/rabbitmqexporter/README.md +++ b/exporter/rabbitmqexporter/README.md @@ -10,7 +10,11 @@ [development]: https://github.com/open-telemetry/opentelemetry-collector#development -Exports metrics, traces, and logs to [RabbitMQ](https://www.rabbitmq.com/) using the AMQP 0.9.1 protocol +Exports metrics, traces, and logs to [RabbitMQ](https://www.rabbitmq.com/) using the AMQP 0.9.1 protocol. + +Messages are published to the [default exchange](https://www.rabbitmq.com/tutorials/amqp-concepts#exchange-default) direct exchange, but optionally can be published to a different direct exchange. + +This component expects that exchanges, queues, and bindings already exist - they are not currently created by this component. ## Getting Started From c9b20b765032ac52a758b870a72c5a4d33918547 Mon Sep 17 00:00:00 2001 From: Andrzej Stencel Date: Wed, 10 Apr 2024 15:16:32 +0200 Subject: [PATCH 4/6] go mod tidy --- exporter/rabbitmqexporter/go.mod | 6 ++++-- exporter/rabbitmqexporter/go.sum | 8 ++++++-- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/exporter/rabbitmqexporter/go.mod b/exporter/rabbitmqexporter/go.mod index 644d5425a073..6e6634dbcb55 100644 --- a/exporter/rabbitmqexporter/go.mod +++ b/exporter/rabbitmqexporter/go.mod @@ -8,6 +8,7 @@ require ( github.com/stretchr/testify v1.9.0 go.opentelemetry.io/collector/component v0.97.1-0.20240409140257-792fac1b62d4 go.opentelemetry.io/collector/config/configretry v0.97.1-0.20240409140257-792fac1b62d4 + go.opentelemetry.io/collector/config/configtls v0.97.0 go.opentelemetry.io/collector/confmap v0.97.1-0.20240409140257-792fac1b62d4 go.opentelemetry.io/collector/consumer v0.97.1-0.20240409140257-792fac1b62d4 go.opentelemetry.io/collector/exporter v0.97.1-0.20240409140257-792fac1b62d4 @@ -21,7 +22,7 @@ require ( require ( github.com/beorn7/perks v1.0.1 // indirect github.com/cenkalti/backoff/v4 v4.3.0 // indirect - github.com/cespare/xxhash/v2 v2.2.0 // indirect + github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/fsnotify/fsnotify v1.7.0 // indirect github.com/go-logr/logr v1.4.1 // indirect @@ -43,7 +44,9 @@ require ( github.com/prometheus/client_model v0.6.1 // indirect github.com/prometheus/common v0.48.0 // indirect github.com/prometheus/procfs v0.12.0 // indirect + github.com/stretchr/objx v0.5.2 // indirect go.opentelemetry.io/collector v0.97.1-0.20240409140257-792fac1b62d4 // indirect + go.opentelemetry.io/collector/config/configopaque v1.4.0 // indirect go.opentelemetry.io/collector/config/configtelemetry v0.97.1-0.20240409140257-792fac1b62d4 // indirect go.opentelemetry.io/collector/extension v0.97.1-0.20240409140257-792fac1b62d4 // indirect go.opentelemetry.io/collector/receiver v0.97.1-0.20240409140257-792fac1b62d4 // indirect @@ -52,7 +55,6 @@ require ( go.opentelemetry.io/otel/sdk v1.24.0 // indirect go.opentelemetry.io/otel/sdk/metric v1.24.0 // indirect go.uber.org/multierr v1.11.0 // indirect - go.uber.org/zap v1.27.0 // indirect golang.org/x/net v0.23.0 // indirect golang.org/x/sys v0.18.0 // indirect golang.org/x/text v0.14.0 // indirect diff --git a/exporter/rabbitmqexporter/go.sum b/exporter/rabbitmqexporter/go.sum index b39808a956e2..462a31f53c1b 100644 --- a/exporter/rabbitmqexporter/go.sum +++ b/exporter/rabbitmqexporter/go.sum @@ -2,8 +2,8 @@ github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8= github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= -github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= -github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -82,10 +82,14 @@ go.opentelemetry.io/collector v0.97.1-0.20240409140257-792fac1b62d4 h1:2t5axZqKO go.opentelemetry.io/collector v0.97.1-0.20240409140257-792fac1b62d4/go.mod h1:tqJMDpvR9AgWwvfUiQnvYBfOc2jCQdmA700G92iU1to= go.opentelemetry.io/collector/component v0.97.1-0.20240409140257-792fac1b62d4 h1:47QO6HD8Ts3w9fAV0Qf3lIH8RFP1vXK1xEPAcL7FYkE= go.opentelemetry.io/collector/component v0.97.1-0.20240409140257-792fac1b62d4/go.mod h1:um+Bn0rshAr3diL1p+GCpw3cRxENFYMNIPBL6Hl8Pok= +go.opentelemetry.io/collector/config/configopaque v1.4.0 h1:5KgD9oLN+N07HqDsLzUrU0mE2pC8cMhrCSC1Nf8CEO4= +go.opentelemetry.io/collector/config/configopaque v1.4.0/go.mod h1:7Qzo69x7i+FaNELeA9jmZtVvfnR5lE6JYa5YEOCJPFQ= go.opentelemetry.io/collector/config/configretry v0.97.1-0.20240409140257-792fac1b62d4 h1:ig9pOmasum4InkD+9bCWwvH+Y81fHW1SfYnigJJKjpc= go.opentelemetry.io/collector/config/configretry v0.97.1-0.20240409140257-792fac1b62d4/go.mod h1:uRdmPeCkrW9Zsadh2WEbQ1AGXGYJ02vCfmmT+0g69nY= go.opentelemetry.io/collector/config/configtelemetry v0.97.1-0.20240409140257-792fac1b62d4 h1:pgNIGcQNf2rAI7qtUV7UGk+4D8RD/m8CeoU/Sv5qvIM= go.opentelemetry.io/collector/config/configtelemetry v0.97.1-0.20240409140257-792fac1b62d4/go.mod h1:YV5PaOdtnU1xRomPcYqoHmyCr48tnaAREeGO96EZw8o= +go.opentelemetry.io/collector/config/configtls v0.97.0 h1:wmXj/rKQUGMZzbHVCTyB+xUWImsGxnLqhivwjBE0FdI= +go.opentelemetry.io/collector/config/configtls v0.97.0/go.mod h1:ev/fMI6hm1WTSHHEAEoVjF3RZj0qf38E/XO5itFku7k= go.opentelemetry.io/collector/confmap v0.97.1-0.20240409140257-792fac1b62d4 h1:66aIqz1za7iONOam/MFy34022jW2RugCLu1ULadOtl0= go.opentelemetry.io/collector/confmap v0.97.1-0.20240409140257-792fac1b62d4/go.mod h1:U5bMvFybP/N7lvWWaLYRh7jKHEVQufdQTzHUXxvN+4o= go.opentelemetry.io/collector/consumer v0.97.1-0.20240409140257-792fac1b62d4 h1:+LSirb0OpU/eanlXFC3P8DoP9S4GLmpvkDJNq7Vl7Eg= From f0d163ab4260395d97906f2bc8d3a55e7dd5d2c6 Mon Sep 17 00:00:00 2001 From: Andrzej Stencel Date: Wed, 10 Apr 2024 15:25:18 +0200 Subject: [PATCH 5/6] fix module versions --- exporter/rabbitmqexporter/go.mod | 4 ++-- exporter/rabbitmqexporter/go.sum | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/exporter/rabbitmqexporter/go.mod b/exporter/rabbitmqexporter/go.mod index 6e6634dbcb55..e987f790ffa0 100644 --- a/exporter/rabbitmqexporter/go.mod +++ b/exporter/rabbitmqexporter/go.mod @@ -8,7 +8,7 @@ require ( github.com/stretchr/testify v1.9.0 go.opentelemetry.io/collector/component v0.97.1-0.20240409140257-792fac1b62d4 go.opentelemetry.io/collector/config/configretry v0.97.1-0.20240409140257-792fac1b62d4 - go.opentelemetry.io/collector/config/configtls v0.97.0 + go.opentelemetry.io/collector/config/configtls v0.97.1-0.20240409140257-792fac1b62d4 go.opentelemetry.io/collector/confmap v0.97.1-0.20240409140257-792fac1b62d4 go.opentelemetry.io/collector/consumer v0.97.1-0.20240409140257-792fac1b62d4 go.opentelemetry.io/collector/exporter v0.97.1-0.20240409140257-792fac1b62d4 @@ -46,7 +46,7 @@ require ( github.com/prometheus/procfs v0.12.0 // indirect github.com/stretchr/objx v0.5.2 // indirect go.opentelemetry.io/collector v0.97.1-0.20240409140257-792fac1b62d4 // indirect - go.opentelemetry.io/collector/config/configopaque v1.4.0 // indirect + go.opentelemetry.io/collector/config/configopaque v1.4.1-0.20240409140257-792fac1b62d4 // indirect go.opentelemetry.io/collector/config/configtelemetry v0.97.1-0.20240409140257-792fac1b62d4 // indirect go.opentelemetry.io/collector/extension v0.97.1-0.20240409140257-792fac1b62d4 // indirect go.opentelemetry.io/collector/receiver v0.97.1-0.20240409140257-792fac1b62d4 // indirect diff --git a/exporter/rabbitmqexporter/go.sum b/exporter/rabbitmqexporter/go.sum index 462a31f53c1b..4d97bd3010f1 100644 --- a/exporter/rabbitmqexporter/go.sum +++ b/exporter/rabbitmqexporter/go.sum @@ -82,14 +82,14 @@ go.opentelemetry.io/collector v0.97.1-0.20240409140257-792fac1b62d4 h1:2t5axZqKO go.opentelemetry.io/collector v0.97.1-0.20240409140257-792fac1b62d4/go.mod h1:tqJMDpvR9AgWwvfUiQnvYBfOc2jCQdmA700G92iU1to= go.opentelemetry.io/collector/component v0.97.1-0.20240409140257-792fac1b62d4 h1:47QO6HD8Ts3w9fAV0Qf3lIH8RFP1vXK1xEPAcL7FYkE= go.opentelemetry.io/collector/component v0.97.1-0.20240409140257-792fac1b62d4/go.mod h1:um+Bn0rshAr3diL1p+GCpw3cRxENFYMNIPBL6Hl8Pok= -go.opentelemetry.io/collector/config/configopaque v1.4.0 h1:5KgD9oLN+N07HqDsLzUrU0mE2pC8cMhrCSC1Nf8CEO4= -go.opentelemetry.io/collector/config/configopaque v1.4.0/go.mod h1:7Qzo69x7i+FaNELeA9jmZtVvfnR5lE6JYa5YEOCJPFQ= +go.opentelemetry.io/collector/config/configopaque v1.4.1-0.20240409140257-792fac1b62d4 h1:YYWytbJOhBQKt2NyCxc10Wk57kYs5i5jiZNftOu9Qw0= +go.opentelemetry.io/collector/config/configopaque v1.4.1-0.20240409140257-792fac1b62d4/go.mod h1:U1IpLFisA6ChU2xBKSJfy7ivI/DaDSurgvoEFKfPVwo= go.opentelemetry.io/collector/config/configretry v0.97.1-0.20240409140257-792fac1b62d4 h1:ig9pOmasum4InkD+9bCWwvH+Y81fHW1SfYnigJJKjpc= go.opentelemetry.io/collector/config/configretry v0.97.1-0.20240409140257-792fac1b62d4/go.mod h1:uRdmPeCkrW9Zsadh2WEbQ1AGXGYJ02vCfmmT+0g69nY= go.opentelemetry.io/collector/config/configtelemetry v0.97.1-0.20240409140257-792fac1b62d4 h1:pgNIGcQNf2rAI7qtUV7UGk+4D8RD/m8CeoU/Sv5qvIM= go.opentelemetry.io/collector/config/configtelemetry v0.97.1-0.20240409140257-792fac1b62d4/go.mod h1:YV5PaOdtnU1xRomPcYqoHmyCr48tnaAREeGO96EZw8o= -go.opentelemetry.io/collector/config/configtls v0.97.0 h1:wmXj/rKQUGMZzbHVCTyB+xUWImsGxnLqhivwjBE0FdI= -go.opentelemetry.io/collector/config/configtls v0.97.0/go.mod h1:ev/fMI6hm1WTSHHEAEoVjF3RZj0qf38E/XO5itFku7k= +go.opentelemetry.io/collector/config/configtls v0.97.1-0.20240409140257-792fac1b62d4 h1:1rmikYza0o9sCLl1b0//YhSweWVxBNnON9fIV5c0Ca0= +go.opentelemetry.io/collector/config/configtls v0.97.1-0.20240409140257-792fac1b62d4/go.mod h1:ev/fMI6hm1WTSHHEAEoVjF3RZj0qf38E/XO5itFku7k= go.opentelemetry.io/collector/confmap v0.97.1-0.20240409140257-792fac1b62d4 h1:66aIqz1za7iONOam/MFy34022jW2RugCLu1ULadOtl0= go.opentelemetry.io/collector/confmap v0.97.1-0.20240409140257-792fac1b62d4/go.mod h1:U5bMvFybP/N7lvWWaLYRh7jKHEVQufdQTzHUXxvN+4o= go.opentelemetry.io/collector/consumer v0.97.1-0.20240409140257-792fac1b62d4 h1:+LSirb0OpU/eanlXFC3P8DoP9S4GLmpvkDJNq7Vl7Eg= From 8f09ac98d64e67808d12969bce4f153d69ce7468 Mon Sep 17 00:00:00 2001 From: swar8080 Date: Wed, 17 Apr 2024 19:50:41 -0400 Subject: [PATCH 6/6] Handle updated LoadTLSConfig function signature --- exporter/rabbitmqexporter/factory.go | 2 +- exporter/rabbitmqexporter/rabbitmq_exporter.go | 6 +++--- exporter/rabbitmqexporter/rabbitmq_exporter_test.go | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/exporter/rabbitmqexporter/factory.go b/exporter/rabbitmqexporter/factory.go index 6004ad0f2495..b87ced0fe0ec 100644 --- a/exporter/rabbitmqexporter/factory.go +++ b/exporter/rabbitmqexporter/factory.go @@ -141,7 +141,7 @@ func newTLSFactory(config *Config) tlsFactory { if config.Connection.TLSConfig != nil { return config.Connection.TLSConfig.LoadTLSConfig } - return func() (*tls.Config, error) { + return func(context.Context) (*tls.Config, error) { return nil, nil } } diff --git a/exporter/rabbitmqexporter/rabbitmq_exporter.go b/exporter/rabbitmqexporter/rabbitmq_exporter.go index c6f01297cdda..40d068e3f076 100644 --- a/exporter/rabbitmqexporter/rabbitmq_exporter.go +++ b/exporter/rabbitmqexporter/rabbitmq_exporter.go @@ -28,7 +28,7 @@ type rabbitmqExporter struct { } type publisherFactory = func(publisher.DialConfig) (publisher.Publisher, error) -type tlsFactory = func() (*tls.Config, error) +type tlsFactory = func(context.Context) (*tls.Config, error) func newRabbitmqExporter(cfg *Config, set component.TelemetrySettings, publisherFactory publisherFactory, tlsFactory tlsFactory, routingKey string, connectionName string) *rabbitmqExporter { exporter := &rabbitmqExporter{ @@ -42,7 +42,7 @@ func newRabbitmqExporter(cfg *Config, set component.TelemetrySettings, publisher return exporter } -func (e *rabbitmqExporter) start(_ context.Context, host component.Host) error { +func (e *rabbitmqExporter) start(ctx context.Context, host component.Host) error { m, err := newMarshaler(e.config.EncodingExtensionID, host) if err != nil { return err @@ -63,7 +63,7 @@ func (e *rabbitmqExporter) start(_ context.Context, host component.Host) error { PublishConfirmationTimeout: e.config.Connection.PublishConfirmationTimeout, } - tlsConfig, err := e.tlsFactory() + tlsConfig, err := e.tlsFactory(ctx) if err != nil { return err } diff --git a/exporter/rabbitmqexporter/rabbitmq_exporter_test.go b/exporter/rabbitmqexporter/rabbitmq_exporter_test.go index 3f0f401d5154..7ede372ff1d2 100644 --- a/exporter/rabbitmqexporter/rabbitmq_exporter_test.go +++ b/exporter/rabbitmqexporter/rabbitmq_exporter_test.go @@ -86,7 +86,7 @@ func TestStart_TLSError(t *testing.T) { var pubFactory = func(publisher.DialConfig) (publisher.Publisher, error) { return &pub, nil } - tlsFactory := func() (*tls.Config, error) { + tlsFactory := func(context.Context) (*tls.Config, error) { return nil, errors.New("simulating tls config error") } exporter := newRabbitmqExporter(cfg, exportertest.NewNopCreateSettings().TelemetrySettings, pubFactory, tlsFactory, routingKey, connectionName)