Skip to content

Commit

Permalink
Fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
djaglowski committed Oct 26, 2022
1 parent b87942e commit af859b6
Show file tree
Hide file tree
Showing 6 changed files with 216 additions and 4 deletions.
97 changes: 97 additions & 0 deletions component/componenttest/nop_connector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package componenttest // import "go.opentelemetry.io/collector/component/componenttest"

import (
"context"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumertest"
)

// NewNopConnectorCreateSettings returns a new nop settings for Create*Connector functions.
func NewNopConnectorCreateSettings() component.ConnectorCreateSettings {
return component.ConnectorCreateSettings{
TelemetrySettings: NewNopTelemetrySettings(),
BuildInfo: component.NewDefaultBuildInfo(),
}
}

type nopConnectorConfig struct {
config.ConnectorSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct
}

// NewNopConnectorFactory returns a component.ConnectorFactory that constructs nop processors.
func NewNopConnectorFactory() component.ConnectorFactory {
return component.NewConnectorFactory(
"nop",
func() config.Connector {
return &nopConnectorConfig{
ConnectorSettings: config.NewConnectorSettings(config.NewComponentID("nop")),
}
},
component.WithTracesConnector(createTracesConnector, component.StabilityLevelStable),
component.WithTracesToMetricsConnector(createTracesToMetricsConnector, component.StabilityLevelStable),
component.WithTracesToLogsConnector(createTracesToLogsConnector, component.StabilityLevelStable),
component.WithMetricsConnector(createMetricsConnector, component.StabilityLevelStable),
component.WithMetricsToTracesConnector(createMetricsToTracesConnector, component.StabilityLevelStable),
component.WithMetricsToLogsConnector(createMetricsToLogsConnector, component.StabilityLevelStable),
component.WithLogsConnector(createLogsConnector, component.StabilityLevelStable),
component.WithLogsToTracesConnector(createLogsToTracesConnector, component.StabilityLevelStable),
component.WithLogsToMetricsConnector(createLogsToMetricsConnector, component.StabilityLevelStable),
)
}

func createTracesConnector(context.Context, component.ConnectorCreateSettings, config.Connector, consumer.Traces) (component.TracesConnector, error) {
return nopConnectorInstance, nil
}
func createTracesToMetricsConnector(context.Context, component.ConnectorCreateSettings, config.Connector, consumer.Metrics) (component.TracesToMetricsConnector, error) {
return nopConnectorInstance, nil
}
func createTracesToLogsConnector(context.Context, component.ConnectorCreateSettings, config.Connector, consumer.Logs) (component.TracesToLogsConnector, error) {
return nopConnectorInstance, nil
}

func createMetricsConnector(context.Context, component.ConnectorCreateSettings, config.Connector, consumer.Metrics) (component.MetricsConnector, error) {
return nopConnectorInstance, nil
}
func createMetricsToTracesConnector(context.Context, component.ConnectorCreateSettings, config.Connector, consumer.Traces) (component.MetricsToTracesConnector, error) {
return nopConnectorInstance, nil
}
func createMetricsToLogsConnector(context.Context, component.ConnectorCreateSettings, config.Connector, consumer.Logs) (component.MetricsToLogsConnector, error) {
return nopConnectorInstance, nil
}

func createLogsConnector(context.Context, component.ConnectorCreateSettings, config.Connector, consumer.Logs) (component.LogsConnector, error) {
return nopConnectorInstance, nil
}
func createLogsToTracesConnector(context.Context, component.ConnectorCreateSettings, config.Connector, consumer.Traces) (component.LogsToTracesConnector, error) {
return nopConnectorInstance, nil
}
func createLogsToMetricsConnector(context.Context, component.ConnectorCreateSettings, config.Connector, consumer.Metrics) (component.LogsToMetricsConnector, error) {
return nopConnectorInstance, nil
}

var nopConnectorInstance = &nopConnector{
Consumer: consumertest.NewNop(),
}

// nopConnector stores consumed traces and metrics for testing purposes.
type nopConnector struct {
nopComponent
consumertest.Consumer
}
91 changes: 91 additions & 0 deletions component/componenttest/nop_connector_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package componenttest

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
)

func TestNewNopConnectorFactory(t *testing.T) {
factory := NewNopConnectorFactory()
require.NotNil(t, factory)
assert.Equal(t, config.Type("nop"), factory.Type())
cfg := factory.CreateDefaultConfig()
assert.Equal(t, &nopConnectorConfig{ConnectorSettings: config.NewConnectorSettings(config.NewComponentID("nop"))}, cfg)

traces, err := factory.CreateTracesConnector(context.Background(), NewNopConnectorCreateSettings(), cfg, consumertest.NewNop())
require.NoError(t, err)
assert.NoError(t, traces.Start(context.Background(), NewNopHost()))
assert.NoError(t, traces.ConsumeTraces(context.Background(), ptrace.NewTraces()))
assert.NoError(t, traces.Shutdown(context.Background()))

tracesToMetrics, err := factory.CreateTracesToMetricsConnector(context.Background(), NewNopConnectorCreateSettings(), cfg, consumertest.NewNop())
require.NoError(t, err)
assert.NoError(t, tracesToMetrics.Start(context.Background(), NewNopHost()))
assert.NoError(t, tracesToMetrics.ConsumeTraces(context.Background(), ptrace.NewTraces()))
assert.NoError(t, tracesToMetrics.Shutdown(context.Background()))

tracesToLogs, err := factory.CreateTracesToLogsConnector(context.Background(), NewNopConnectorCreateSettings(), cfg, consumertest.NewNop())
require.NoError(t, err)
assert.NoError(t, tracesToLogs.Start(context.Background(), NewNopHost()))
assert.NoError(t, tracesToLogs.ConsumeTraces(context.Background(), ptrace.NewTraces()))
assert.NoError(t, tracesToLogs.Shutdown(context.Background()))

metrics, err := factory.CreateMetricsConnector(context.Background(), NewNopConnectorCreateSettings(), cfg, consumertest.NewNop())
require.NoError(t, err)
assert.NoError(t, metrics.Start(context.Background(), NewNopHost()))
assert.NoError(t, metrics.ConsumeMetrics(context.Background(), pmetric.NewMetrics()))
assert.NoError(t, metrics.Shutdown(context.Background()))

metricsToTraces, err := factory.CreateMetricsToTracesConnector(context.Background(), NewNopConnectorCreateSettings(), cfg, consumertest.NewNop())
require.NoError(t, err)
assert.NoError(t, metricsToTraces.Start(context.Background(), NewNopHost()))
assert.NoError(t, metricsToTraces.ConsumeMetrics(context.Background(), pmetric.NewMetrics()))
assert.NoError(t, metricsToTraces.Shutdown(context.Background()))

metricsToLogs, err := factory.CreateMetricsToLogsConnector(context.Background(), NewNopConnectorCreateSettings(), cfg, consumertest.NewNop())
require.NoError(t, err)
assert.NoError(t, metricsToLogs.Start(context.Background(), NewNopHost()))
assert.NoError(t, metricsToLogs.ConsumeMetrics(context.Background(), pmetric.NewMetrics()))
assert.NoError(t, metricsToLogs.Shutdown(context.Background()))

logs, err := factory.CreateLogsConnector(context.Background(), NewNopConnectorCreateSettings(), cfg, consumertest.NewNop())
require.NoError(t, err)
assert.NoError(t, logs.Start(context.Background(), NewNopHost()))
assert.NoError(t, logs.ConsumeLogs(context.Background(), plog.NewLogs()))
assert.NoError(t, logs.Shutdown(context.Background()))

logsToTraces, err := factory.CreateLogsToTracesConnector(context.Background(), NewNopConnectorCreateSettings(), cfg, consumertest.NewNop())
require.NoError(t, err)
assert.NoError(t, logsToTraces.Start(context.Background(), NewNopHost()))
assert.NoError(t, logsToTraces.ConsumeLogs(context.Background(), plog.NewLogs()))
assert.NoError(t, logsToTraces.Shutdown(context.Background()))

logsToMetrics, err := factory.CreateLogsToMetricsConnector(context.Background(), NewNopConnectorCreateSettings(), cfg, consumertest.NewNop())
require.NoError(t, err)
assert.NoError(t, logsToMetrics.Start(context.Background(), NewNopHost()))
assert.NoError(t, logsToMetrics.ConsumeLogs(context.Background(), plog.NewLogs()))
assert.NoError(t, logsToMetrics.Shutdown(context.Background()))
}
9 changes: 9 additions & 0 deletions component/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type Connector interface {
// then process and export the trace to the appropriate backend.
type TracesConnector interface {
Connector
consumer.Traces
}

// A TracesToMetricsConnector acts as an exporter from a traces pipeline and a receiver to a metrics pipeline.
Expand All @@ -44,6 +45,7 @@ type TracesConnector interface {
// For example traces could be summarized by a metrics connector that emits statistics describing the traces observed.
type TracesToMetricsConnector interface {
Connector
consumer.Traces
}

// A TracesToLogsConnector acts as an exporter from a traces pipeline and a receiver to a logs pipeline.
Expand All @@ -53,6 +55,7 @@ type TracesToMetricsConnector interface {
// For example traces could be analyzed by a logs connector that emits events when particular criteria are met.
type TracesToLogsConnector interface {
Connector
consumer.Traces
}

// A MetricsConnector sends metrics from one pipeline to another.
Expand All @@ -64,6 +67,7 @@ type TracesToLogsConnector interface {
// then process and export the metric to the appropriate backend.
type MetricsConnector interface {
Connector
consumer.Metrics
}

// A MetricsToTracesConnector acts as an exporter from a metrics pipeline and a receiver to a traces pipeline.
Expand All @@ -73,6 +77,7 @@ type MetricsConnector interface {
// For example latency between related data points could be modeled and emitted as traces.
type MetricsToTracesConnector interface {
Connector
consumer.Metrics
}

// A MetricsToLogsConnector acts as an exporter from a metrics pipeline and a receiver to a logs pipeline.
Expand All @@ -82,6 +87,7 @@ type MetricsToTracesConnector interface {
// For example metrics could be analyzed by a logs connector that emits events when particular criteria are met.
type MetricsToLogsConnector interface {
Connector
consumer.Metrics
}

// A LogsConnector sends logs from one pipeline to another.
Expand All @@ -93,6 +99,7 @@ type MetricsToLogsConnector interface {
// then process and export the log to the appropriate backend.
type LogsConnector interface {
Connector
consumer.Logs
}

// A LogsToTracesConnector acts as an exporter from a logs pipeline and a receiver to a traces pipeline.
Expand All @@ -102,6 +109,7 @@ type LogsConnector interface {
// For example structured logs containing span information could be consumed and emitted as traces.
type LogsToTracesConnector interface {
Connector
consumer.Logs
}

// A LogsToMetricsConnector acts as an exporter from a logs pipeline and a receiver to a metrics pipeline.
Expand All @@ -111,6 +119,7 @@ type LogsToTracesConnector interface {
// For example metrics could be extracted from structured logs that contain numeric data.
type LogsToMetricsConnector interface {
Connector
consumer.Logs
}

// ConnectorCreateSettings configures Connector creators.
Expand Down
1 change: 1 addition & 0 deletions service/config_provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ var configNop = &Config{
Processors: map[config.ComponentID]config.Processor{config.NewComponentID("nop"): componenttest.NewNopProcessorFactory().CreateDefaultConfig()},
Exporters: map[config.ComponentID]config.Exporter{config.NewComponentID("nop"): componenttest.NewNopExporterFactory().CreateDefaultConfig()},
Extensions: map[config.ComponentID]config.Extension{config.NewComponentID("nop"): componenttest.NewNopExtensionFactory().CreateDefaultConfig()},
Connectors: map[config.ComponentID]config.Connector{},
Service: ConfigService{
Extensions: []config.ComponentID{config.NewComponentID("nop")},
Pipelines: map[config.ComponentID]*ConfigServicePipeline{
Expand Down
20 changes: 17 additions & 3 deletions service/internal/pipelines/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,27 @@ func (g *pipelinesGraph) ShutdownAll(ctx context.Context) error {

func (g *pipelinesGraph) GetExporters() map[config.DataType]map[config.ComponentID]component.Exporter {
exportersMap := make(map[config.DataType]map[config.ComponentID]component.Exporter)

exportersMap[config.TracesDataType] = make(map[config.ComponentID]component.Exporter)
exportersMap[config.MetricsDataType] = make(map[config.ComponentID]component.Exporter)
exportersMap[config.LogsDataType] = make(map[config.ComponentID]component.Exporter)

// TODO

for _, pg := range g.pipelineGraphs {
for _, expNode := range pg.exporters {
expOrConnNode := g.componentGraph.Node(expNode.ID())
expNode, ok := expOrConnNode.(*exporterNode)
if !ok {
continue
}
switch expNode.pipelineType {
case config.TracesDataType:
exportersMap[config.TracesDataType][expNode.componentID] = expNode.Component
case config.MetricsDataType:
exportersMap[config.MetricsDataType][expNode.componentID] = expNode.Component
case config.LogsDataType:
exportersMap[config.LogsDataType][expNode.componentID] = expNode.Component
}
}
}
return exportersMap
}

Expand Down
2 changes: 1 addition & 1 deletion service/testdata/otelcol-nop.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,4 @@ service:
logs:
receivers: [nop]
processors: [nop]
exporters: [nop]
exporters: [nop]

0 comments on commit af859b6

Please sign in to comment.