Skip to content

Commit

Permalink
[chore] Add internal attribute package
Browse files Browse the repository at this point in the history
  • Loading branch information
djaglowski committed Jan 10, 2025
1 parent 0ece678 commit 18debdd
Show file tree
Hide file tree
Showing 2 changed files with 305 additions and 0 deletions.
112 changes: 112 additions & 0 deletions service/internal/attribute/attribute.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package attribute // import "go.opentelemetry.io/collector/service/internal/attribute"

import (
"hash/fnv"

"go.opentelemetry.io/otel/attribute"
"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pipeline"
)

const (
componentKindKey = "otelcol.component.kind"
componentIDKey = "otelcol.component.id"
pipelineIDKey = "otelcol.pipeline.id"
signalKey = "otelcol.signal"
signalOutputKey = "otelcol.signal.output"

capabiltiesKind = "capabilities"
fanoutKind = "fanout"
)

type Attributes struct {
set attribute.Set
id int64
}

func newAttributes(attrs ...attribute.KeyValue) *Attributes {
h := fnv.New64a()
for _, kv := range attrs {
h.Write([]byte("(" + string(kv.Key) + "|" + kv.Value.AsString() + ")"))
}
return &Attributes{
set: attribute.NewSet(attrs...),
id: int64(h.Sum64()), // #nosec G115
}
}

func (a Attributes) Attributes() *attribute.Set {
return &a.set
}

func (a Attributes) ID() int64 {
return a.id
}

func (a Attributes) Logger(logger *zap.Logger) *zap.Logger {
fields := make([]zap.Field, 0, a.set.Len())
for _, kv := range a.set.ToSlice() {
fields = append(fields, zap.String(string(kv.Key), kv.Value.AsString()))
}
return logger.With(fields...)

Check warning on line 56 in service/internal/attribute/attribute.go

View check run for this annotation

Codecov / codecov/patch

service/internal/attribute/attribute.go#L51-L56

Added lines #L51 - L56 were not covered by tests
}

func Receiver(pipelineType pipeline.Signal, id component.ID) *Attributes {
return newAttributes(
attribute.String(componentKindKey, component.KindReceiver.String()),
attribute.String(signalKey, pipelineType.String()),
attribute.String(componentIDKey, id.String()),
)
}

func Processor(pipelineID pipeline.ID, id component.ID) *Attributes {
return newAttributes(
attribute.String(componentKindKey, component.KindProcessor.String()),
attribute.String(signalKey, pipelineID.Signal().String()),
attribute.String(pipelineIDKey, pipelineID.String()),
attribute.String(componentIDKey, id.String()),
)
}

func Exporter(pipelineType pipeline.Signal, id component.ID) *Attributes {
return newAttributes(
attribute.String(componentKindKey, component.KindExporter.String()),
attribute.String(signalKey, pipelineType.String()),
attribute.String(componentIDKey, id.String()),
)
}

func Connector(exprPipelineType, rcvrPipelineType pipeline.Signal, id component.ID) *Attributes {
return newAttributes(
attribute.String(componentKindKey, component.KindConnector.String()),
attribute.String(signalKey, exprPipelineType.String()),
attribute.String(signalOutputKey, rcvrPipelineType.String()),
attribute.String(componentIDKey, id.String()),
)
}

func Capabilities(pipelineID pipeline.ID) *Attributes {
return newAttributes(
attribute.String(componentKindKey, capabiltiesKind),
attribute.String(pipelineIDKey, pipelineID.String()),
)
}

func Fanout(pipelineID pipeline.ID) *Attributes {
return newAttributes(
attribute.String(componentKindKey, fanoutKind),
attribute.String(pipelineIDKey, pipelineID.String()),
)
}

func Extension(id component.ID) *Attributes {
return newAttributes(
attribute.String(componentKindKey, component.KindExtension.String()),
attribute.String(componentIDKey, id.String()),
)
}
193 changes: 193 additions & 0 deletions service/internal/attribute/attribute_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package attribute

import (
"testing"

"github.com/stretchr/testify/require"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pipeline"
"go.opentelemetry.io/collector/pipeline/xpipeline"
)

var (
signals = []pipeline.Signal{
pipeline.SignalTraces,
pipeline.SignalMetrics,
pipeline.SignalLogs,
xpipeline.SignalProfiles,
}

cIDs = []component.ID{
component.MustNewID("foo"),
component.MustNewID("foo2"),
component.MustNewID("bar"),
}

pIDs = []pipeline.ID{
pipeline.MustNewID("traces"),
pipeline.MustNewIDWithName("traces", "2"),
pipeline.MustNewID("metrics"),
pipeline.MustNewIDWithName("metrics", "2"),
pipeline.MustNewID("logs"),
pipeline.MustNewIDWithName("logs", "2"),
pipeline.MustNewID("profiles"),
pipeline.MustNewIDWithName("profiles", "2"),
}
)

func TestReceiver(t *testing.T) {
for _, sig := range signals {
for _, id := range cIDs {
r := Receiver(sig, id)
componentKind, ok := r.Attributes().Value(componentKindKey)
require.True(t, ok)
require.Equal(t, component.KindReceiver.String(), componentKind.AsString())

signal, ok := r.Attributes().Value(signalKey)
require.True(t, ok)
require.Equal(t, sig.String(), signal.AsString())

componentID, ok := r.Attributes().Value(componentIDKey)
require.True(t, ok)
require.Equal(t, id.String(), componentID.AsString())
}
}
}

func TestProcessor(t *testing.T) {
for _, pID := range pIDs {
for _, id := range cIDs {
p := Processor(pID, id)
componentKind, ok := p.Attributes().Value(componentKindKey)
require.True(t, ok)
require.Equal(t, component.KindProcessor.String(), componentKind.AsString())

pipelineID, ok := p.Attributes().Value(pipelineIDKey)
require.True(t, ok)
require.Equal(t, pID.String(), pipelineID.AsString())

componentID, ok := p.Attributes().Value(componentIDKey)
require.True(t, ok)
require.Equal(t, id.String(), componentID.AsString())
}
}
}

func TestExporter(t *testing.T) {
for _, sig := range signals {
for _, id := range cIDs {
e := Exporter(sig, id)
componentKind, ok := e.Attributes().Value(componentKindKey)
require.True(t, ok)
require.Equal(t, component.KindExporter.String(), componentKind.AsString())

signal, ok := e.Attributes().Value(signalKey)
require.True(t, ok)
require.Equal(t, sig.String(), signal.AsString())

componentID, ok := e.Attributes().Value(componentIDKey)
require.True(t, ok)
require.Equal(t, id.String(), componentID.AsString())
}
}
}

func TestConnector(t *testing.T) {
for _, exprSig := range signals {
for _, rcvrSig := range signals {
for _, id := range cIDs {
c := Connector(exprSig, rcvrSig, id)
componentKind, ok := c.Attributes().Value(componentKindKey)
require.True(t, ok)
require.Equal(t, component.KindConnector.String(), componentKind.AsString())

signal, ok := c.Attributes().Value(signalKey)
require.True(t, ok)
require.Equal(t, exprSig.String(), signal.AsString())

signalOutput, ok := c.Attributes().Value(signalOutputKey)
require.True(t, ok)
require.Equal(t, rcvrSig.String(), signalOutput.AsString())

componentID, ok := c.Attributes().Value(componentIDKey)
require.True(t, ok)
require.Equal(t, id.String(), componentID.AsString())
}
}
}
}

func TestExtension(t *testing.T) {
e := Extension(component.MustNewID("foo"))
componentKind, ok := e.Attributes().Value(componentKindKey)
require.True(t, ok)
require.Equal(t, component.KindExtension.String(), componentKind.AsString())
}

func TestSetEquality(t *testing.T) {
// The sets are created independently but should be exactly equivalent.
// We will ensure that corresponding elements are equal and that
// non-corresponding elements are not equal.
setI, setJ := createExampleSets(), createExampleSets()
for i, ei := range setI {
for j, ej := range setJ {
if i == j {
require.Equal(t, ei.ID(), ej.ID())
require.True(t, ei.Attributes().Equals(ej.Attributes()))
} else {
require.NotEqual(t, ei.ID(), ej.ID())
require.False(t, ei.Attributes().Equals(ej.Attributes()))
}
}
}
}

func createExampleSets() []*Attributes {
sets := []*Attributes{}

// Receiver examples.
for _, sig := range signals {
for _, id := range cIDs {
sets = append(sets, Receiver(sig, id))
}
}

// Processor examples.
for _, pID := range pIDs {
for _, cID := range cIDs {
sets = append(sets, Processor(pID, cID))
}
}

// Exporter examples.
for _, sig := range signals {
for _, id := range cIDs {
sets = append(sets, Exporter(sig, id))
}
}

// Connector examples.
for _, exprSig := range signals {
for _, rcvrSig := range signals {
for _, id := range cIDs {
sets = append(sets, Connector(exprSig, rcvrSig, id))
}
}
}

// Capabilities examples.
for _, pID := range pIDs {
sets = append(sets, Capabilities(pID))
}

// Fanout examples.
for _, pID := range pIDs {
sets = append(sets, Fanout(pID))
}

return sets
}

0 comments on commit 18debdd

Please sign in to comment.