Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RabbitMQ exporter implementation #32051

Merged
27 changes: 27 additions & 0 deletions .chloggen/rabbitmq-exporter-implementation.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: rabbitmqexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Implements the RabbitMQ exporter

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [28891]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
24 changes: 16 additions & 8 deletions exporter/rabbitmqexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,11 @@
[development]: https://github.com/open-telemetry/opentelemetry-collector#development
<!-- end autogenerated section -->

Exports metrics, traces, and logs to [RabbitMQ](https://www.rabbitmq.com/) using the AMQP 0.9.1 protocol
Exports metrics, traces, and logs to [RabbitMQ](https://www.rabbitmq.com/) using the AMQP 0.9.1 protocol.

Messages are published to the [default exchange](https://www.rabbitmq.com/tutorials/amqp-concepts#exchange-default) direct exchange, but optionally can be published to a different direct exchange.

This component expects that exchanges, queues, and bindings already exist - they are not currently created by this component.

## Getting Started

Expand All @@ -19,16 +23,15 @@ The following settings can be configured:
- `endpoint` (required, ex = amqp://localhost:5672): Endpoint to connect to RabbitMQ
- `vhost` (optional): The RabbitMQ [virtual host](https://www.rabbitmq.com/docs/vhosts) to connect to
- `auth`:
- `sasl`: Configuration if using SASL PLAIN authentication
- `plain`: Configuration if using SASL PLAIN authentication
- `username` (required): username for authentication
- `password` (required): password for authentication
- `tls` (optional): TODO, need to add this
- `password`: password for authentication
- `tls` (optional): [TLS configuration](https://github.com/open-telemetry/opentelemetry-collector/blob/main/config/configtls/configtls.go#L32)
- `routing`:
- `routing_key` (default = otlp_spans for traces, otlp_metrics for metrics, otlp_logs for logs): Routing key used to route exported messages to RabbitMQ consumers
- `exchange`: Name of the exchange used to route messages. If omitted, the [default exchange](https://www.rabbitmq.com/tutorials/amqp-concepts#exchange-default) is used which routes to a queue with the same as the routing key. Only [direct exchanges](https://www.rabbitmq.com/tutorials/amqp-concepts#exchange-direct) are currently supported. Note that this component does not handle queue creation or binding.
- `durable` (default = true): Whether to instruct RabbitMQ to make messages [durable](https://www.rabbitmq.com/docs/queues#durability) by writing to disk
- `message_body_encoding`: (default = "otlp_proto"): The encoding of telemetry sent to RabbitMQ
- `otlp_proto`: payload is Protobuf serialized from `ExportTraceServiceRequest` if set as a traces exporter or `ExportMetricsServiceRequest` for metrics or `ExportLogsServiceRequest` for logs.
- `otlp_json`: ** EXPERIMENTAL ** payload is JSON serialized from `ExportTraceServiceRequest` if set as a traces exporter or `ExportMetricsServiceRequest` for metrics or `ExportLogsServiceRequest` for logs.
- `encoding_extension`: (defaults to OTLP protobuf format): ID of the [encoding extension](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/extension/encoding) to use to marshal data
- `retry_on_failure`:
- `enabled` (default = false)

Expand All @@ -40,7 +43,12 @@ exporters:
connection:
endpoint: amqp://localhost:5672
auth:
sasl:
plain:
username: user
password: pass
encoding_extension: otlp_encoding/rabbitmq

extensions:
otlp_encoding/rabbitmq:
protocol: otlp_json
```
30 changes: 24 additions & 6 deletions exporter/rabbitmqexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,33 +4,42 @@
package rabbitmqexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/rabbitmqexporter"

import (
"errors"
"time"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configretry"
"go.opentelemetry.io/collector/config/configtls"
)

type Config struct {
Connection ConnectionConfig `mapstructure:"connection"`
Routing RoutingConfig `mapstructure:"routing"`
MessageBodyEncoding string `mapstructure:"message_body_encoding"`
EncodingExtensionID *component.ID `mapstructure:"encoding_extension"`
Durable bool `mapstructure:"durable"`
RetrySettings configretry.BackOffConfig `mapstructure:"retry_on_failure"`
}

type ConnectionConfig struct {
Endpoint string `mapstructure:"endpoint"`
VHost string `mapstructure:"vhost"`
Auth AuthConfig `mapstructure:"auth"`
Endpoint string `mapstructure:"endpoint"`
VHost string `mapstructure:"vhost"`
TLSConfig *configtls.ClientConfig `mapstructure:"tls"`
Auth AuthConfig `mapstructure:"auth"`
ConnectionTimeout time.Duration `mapstructure:"connection_timeout"`
Heartbeat time.Duration `mapstructure:"heartbeat"`
PublishConfirmationTimeout time.Duration `mapstructure:"publish_confirmation_timeout"`
}

type RoutingConfig struct {
Exchange string `mapstructure:"exchange"`
RoutingKey string `mapstructure:"routing_key"`
}

type AuthConfig struct {
SASL SASLConfig `mapstructure:"sasl"`
Plain PlainAuth `mapstructure:"plain"`
}

type SASLConfig struct {
type PlainAuth struct {
Username string `mapstructure:"username"`
Password string `mapstructure:"password"`
}
Expand All @@ -39,5 +48,14 @@ var _ component.Config = (*Config)(nil)

// Validate checks if the exporter configuration is valid
func (cfg *Config) Validate() error {
if cfg.Connection.Endpoint == "" {
return errors.New("connection.endpoint is required")
}

// Password-less users are possible so only validate username
if cfg.Connection.Auth.Plain.Username == "" {
return errors.New("connection.auth.plain.username is required")
}

return nil
}
48 changes: 38 additions & 10 deletions exporter/rabbitmqexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,49 +4,69 @@
package rabbitmqexporter

import (
"errors"
"path/filepath"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configretry"
"go.opentelemetry.io/collector/config/configtls"
"go.opentelemetry.io/collector/confmap/confmaptest"

"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/rabbitmqexporter/internal/metadata"
)

var encodingComponentID = component.NewIDWithName(component.MustNewType("otlp_encoding"), "rabbitmq123")

func TestLoadConfig(t *testing.T) {
t.Parallel()

cm, err := confmaptest.LoadConf(filepath.Join("testdata", "test-config.yaml"))
require.NoError(t, err)

tests := []struct {
id component.ID
expected component.Config
id component.ID
expected component.Config
errorMessage string
}{
{
id: component.NewIDWithName(metadata.Type, ""),
expected: createDefaultConfig().(*Config),
id: component.NewIDWithName(metadata.Type, "missing_endpoint"),
errorMessage: "connection.endpoint is required",
},
{
id: component.NewIDWithName(metadata.Type, "missing_plainauth_username"),
errorMessage: "connection.auth.plain.username is required",
},
{
id: component.NewIDWithName(metadata.Type, "all_fields"),
expected: &Config{
Connection: ConnectionConfig{
Endpoint: "amqp://localhost:5672",
Endpoint: "amqps://localhost:5672",
VHost: "vhost1",
Auth: AuthConfig{
SASL: SASLConfig{
Plain: PlainAuth{
Username: "user",
Password: "pass",
},
},
TLSConfig: &configtls.ClientConfig{
Config: configtls.Config{
CAFile: "cert123",
},
Insecure: true,
},
ConnectionTimeout: time.Millisecond,
Heartbeat: time.Millisecond * 2,
PublishConfirmationTimeout: time.Millisecond * 3,
},
Routing: RoutingConfig{
Exchange: "amq.direct",
RoutingKey: "custom_routing_key",
},
MessageBodyEncoding: "otlp_json",
EncodingExtensionID: &encodingComponentID,
Durable: false,
RetrySettings: configretry.BackOffConfig{
Enabled: true,
Expand All @@ -60,14 +80,16 @@ func TestLoadConfig(t *testing.T) {
Endpoint: "amqp://localhost:5672",
VHost: "",
Auth: AuthConfig{
SASL: SASLConfig{
Plain: PlainAuth{
Username: "user",
Password: "pass",
},
},
ConnectionTimeout: defaultConnectionTimeout,
Heartbeat: defaultConnectionHeartbeat,
PublishConfirmationTimeout: defaultPublishConfirmationTimeout,
},
MessageBodyEncoding: "otlp_proto",
Durable: true,
Durable: true,
RetrySettings: configretry.BackOffConfig{
Enabled: false,
},
Expand All @@ -84,6 +106,12 @@ func TestLoadConfig(t *testing.T) {
require.NoError(t, err)
require.NoError(t, component.UnmarshalConfig(sub, cfg))

if tt.expected == nil {
err = errors.Join(err, component.ValidateConfig(cfg))
assert.ErrorContains(t, err, tt.errorMessage)
return
}

assert.NoError(t, component.ValidateConfig(cfg))
assert.Equal(t, tt.expected, cfg)
})
Expand Down
66 changes: 56 additions & 10 deletions exporter/rabbitmqexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ package rabbitmqexporter // import "github.com/open-telemetry/opentelemetry-coll

import (
"context"
"crypto/tls"
"time"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configretry"
Expand All @@ -13,10 +15,21 @@ import (
"go.opentelemetry.io/collector/exporter/exporterhelper"

"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/rabbitmqexporter/internal/metadata"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/rabbitmqexporter/internal/publisher"
)

const (
defaultEncoding = "otlp_proto"
defaultConnectionTimeout = time.Second * 10
defaultConnectionHeartbeat = time.Second * 5
defaultPublishConfirmationTimeout = time.Second * 5

spansRoutingKey = "otlp_spans"
metricsRoutingKey = "otlp_metrics"
logsRoutingKey = "otlp_logs"

spansConnectionName = "otel-collector-spans"
metricsConnectionName = "otel-collector-metrics"
logsConnectionName = "otel-collector-logs"
)

func NewFactory() exporter.Factory {
Expand All @@ -34,9 +47,13 @@ func createDefaultConfig() component.Config {
Enabled: false,
}
return &Config{
MessageBodyEncoding: defaultEncoding,
Durable: true,
RetrySettings: retrySettings,
Durable: true,
RetrySettings: retrySettings,
Connection: ConnectionConfig{
ConnectionTimeout: defaultConnectionTimeout,
Heartbeat: defaultConnectionHeartbeat,
PublishConfirmationTimeout: defaultPublishConfirmationTimeout,
},
}
}

Expand All @@ -46,13 +63,15 @@ func createTracesExporter(
cfg component.Config,
) (exporter.Traces, error) {
config := cfg.(*Config)
r := newRabbitmqExporter(config, set.TelemetrySettings)

routingKey := getRoutingKeyOrDefault(config, spansRoutingKey)
r := newRabbitmqExporter(config, set.TelemetrySettings, newPublisherFactory(set), newTLSFactory(config), routingKey, spansConnectionName)

return exporterhelper.NewTracesExporter(
ctx,
set,
cfg,
r.pushTraces,
r.publishTraces,
exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}),
exporterhelper.WithStart(r.start),
exporterhelper.WithShutdown(r.shutdown),
Expand All @@ -66,13 +85,15 @@ func createMetricsExporter(
cfg component.Config,
) (exporter.Metrics, error) {
config := (cfg.(*Config))
r := newRabbitmqExporter(config, set.TelemetrySettings)

routingKey := getRoutingKeyOrDefault(config, metricsRoutingKey)
r := newRabbitmqExporter(config, set.TelemetrySettings, newPublisherFactory(set), newTLSFactory(config), routingKey, metricsConnectionName)

return exporterhelper.NewMetricsExporter(
ctx,
set,
cfg,
r.pushMetrics,
r.publishMetrics,
exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}),
exporterhelper.WithStart(r.start),
exporterhelper.WithShutdown(r.shutdown),
Expand All @@ -86,16 +107,41 @@ func createLogsExporter(
cfg component.Config,
) (exporter.Logs, error) {
config := (cfg.(*Config))
r := newRabbitmqExporter(config, set.TelemetrySettings)

routingKey := getRoutingKeyOrDefault(config, logsRoutingKey)
r := newRabbitmqExporter(config, set.TelemetrySettings, newPublisherFactory(set), newTLSFactory(config), routingKey, logsConnectionName)

return exporterhelper.NewLogsExporter(
ctx,
set,
cfg,
r.pushLogs,
r.publishLogs,
exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}),
exporterhelper.WithStart(r.start),
exporterhelper.WithShutdown(r.shutdown),
exporterhelper.WithRetry(config.RetrySettings),
)
}

func getRoutingKeyOrDefault(config *Config, fallback string) string {
routingKey := fallback
if config.Routing.RoutingKey != "" {
routingKey = config.Routing.RoutingKey
}
return routingKey
}

func newPublisherFactory(set exporter.CreateSettings) publisherFactory {
return func(dialConfig publisher.DialConfig) (publisher.Publisher, error) {
return publisher.NewConnection(set.Logger, publisher.NewAmqpClient(), dialConfig)
}
}

func newTLSFactory(config *Config) tlsFactory {
if config.Connection.TLSConfig != nil {
return config.Connection.TLSConfig.LoadTLSConfig
}
return func(context.Context) (*tls.Config, error) {
return nil, nil
}
}
Loading