From 3711c01b0c35f3fb854b6a8e079cfaa688021bc2 Mon Sep 17 00:00:00 2001 From: Dmitrii Anoshin Date: Sat, 11 Jul 2020 19:33:38 -0700 Subject: [PATCH] Migrate Resource Processor to internal data model (#1315) This commit migrates resource processor to internal data model. Existing processor configuration is relevant only to OpenCensus format, so the configuration schema has to be changed. Taking this opportunity, this commit adds existing logic for attributes manipulation from attributes processor to resource processor. New config uses "attributes" field which represents actions that can be made on resource attributes. Supported actions: INSERT, UPDATE, UPSERT, DELETE, HASH, EXTRACT. In order to migrate existing resource processor config: 1. Move key/values from "labels" field to "attributes" with action="upsert". 2. Add value from "type" field to "attributes" with key="opencensus.resourcetype" and action="upsert". --- internal/processor/attraction/attraction.go | 2 +- processor/attributesprocessor/config.go | 2 +- processor/resourceprocessor/README.md | 23 +- processor/resourceprocessor/config.go | 12 +- processor/resourceprocessor/config_test.go | 26 +- processor/resourceprocessor/factory.go | 78 ++++- processor/resourceprocessor/factory_test.go | 78 ++++- .../resourceprocessor/resource_processor.go | 110 +++--- .../resource_processor_test.go | 314 ++++++------------ .../resourceprocessor/testdata/config.yaml | 35 +- testbed/tests/resource_processor_test.go | 107 ++---- 11 files changed, 375 insertions(+), 412 deletions(-) diff --git a/internal/processor/attraction/attraction.go b/internal/processor/attraction/attraction.go index b5d4197b256..367ef2e0e3d 100644 --- a/internal/processor/attraction/attraction.go +++ b/internal/processor/attraction/attraction.go @@ -26,7 +26,7 @@ import ( // Settings type Settings struct { // Actions specifies the list of attributes to act on. - // The set of actions are {INSERT, UPDATE, UPSERT, DELETE}. + // The set of actions are {INSERT, UPDATE, UPSERT, DELETE, HASH, EXTRACT}. // This is a required field. Actions []ActionKeyValue `mapstructure:"actions"` } diff --git a/processor/attributesprocessor/config.go b/processor/attributesprocessor/config.go index 37d38a5b30d..e097df3bade 100644 --- a/processor/attributesprocessor/config.go +++ b/processor/attributesprocessor/config.go @@ -33,7 +33,7 @@ type Config struct { filterspan.MatchConfig `mapstructure:",squash"` // Specifies the list of attributes to act on. - // The set of actions are {INSERT, UPDATE, UPSERT, DELETE}. + // The set of actions are {INSERT, UPDATE, UPSERT, DELETE, HASH, EXTRACT}. // This is a required field. attraction.Settings `mapstructure:",squash"` } diff --git a/processor/resourceprocessor/README.md b/processor/resourceprocessor/README.md index 053b4006de2..0afe0acca21 100644 --- a/processor/resourceprocessor/README.md +++ b/processor/resourceprocessor/README.md @@ -2,25 +2,26 @@ Supported pipeline types: metrics, traces -The resource processor can be used to override a resource. +The resource processor can be used to apply changes on resource attributes. Please refer to [config.go](./config.go) for the config spec. -The following configuration options are required: -- `type`: Resource type to be applied. If specified, this value overrides the -original resource type. Otherwise, the original resource type is kept. -- `labels`: Map of key/value pairs that should be added to the resource. +`attributes` represents actions that can be applied on resource attributes. +See processor/attributesprocessor/README.md for more details on supported attributes actions. Examples: ```yaml processors: resource: - type: "host" - labels: { - "cloud.zone": "zone-1", - "k8s.cluster.name": "k8s-cluster", - "host.name": "k8s-node", - } + attributes: + - key: cloud.zone + value: "zone-1" + action: upsert + - key: k8s.cluster.name + from_attribute: k8s-cluster + action: insert + - key: redundant-attribute + action: delete ``` Refer to [config.yaml](./testdata/config.yaml) for detailed diff --git a/processor/resourceprocessor/config.go b/processor/resourceprocessor/config.go index 51b797a0c35..4c5e020c16c 100644 --- a/processor/resourceprocessor/config.go +++ b/processor/resourceprocessor/config.go @@ -16,14 +16,20 @@ package resourceprocessor import ( "go.opentelemetry.io/collector/config/configmodels" + "go.opentelemetry.io/collector/internal/processor/attraction" ) // Config defines configuration for Resource processor. type Config struct { configmodels.ProcessorSettings `mapstructure:",squash"` - // ResourceType overrides the original resource type. + + // AttributesActions specifies the list of actions to be applied on resource attributes. + // The set of actions are {INSERT, UPDATE, UPSERT, DELETE, HASH, EXTRACT}. + AttributesActions []attraction.ActionKeyValue `mapstructure:"attributes"` + + // ResourceType field is deprecated. Set "opencensus.type" key in "attributes.upsert" map instead. ResourceType string `mapstructure:"type"` - // Labels specify static labels to be added to resource. - // In case of a conflict the label will be overridden. + + // Labels field is deprecated. Use "attributes.upsert" instead. Labels map[string]string `mapstructure:"labels"` } diff --git a/processor/resourceprocessor/config_test.go b/processor/resourceprocessor/config_test.go index 4a7c33e42e7..21182260a4b 100644 --- a/processor/resourceprocessor/config_test.go +++ b/processor/resourceprocessor/config_test.go @@ -22,33 +22,35 @@ import ( "go.opentelemetry.io/collector/config" "go.opentelemetry.io/collector/config/configmodels" + "go.opentelemetry.io/collector/internal/processor/attraction" ) func TestLoadConfig(t *testing.T) { factories, err := config.ExampleComponents() assert.NoError(t, err) - factory := &Factory{} factories.Processors[typeStr] = &Factory{} cfg, err := config.LoadConfigFile(t, path.Join(".", "testdata", "config.yaml"), factories) assert.NoError(t, err) assert.NotNil(t, cfg) - p1 := cfg.Processors["resource"] - assert.Equal(t, p1, factory.CreateDefaultConfig()) - - p2 := cfg.Processors["resource/2"] - assert.Equal(t, p2, &Config{ + assert.Equal(t, cfg.Processors["resource"], &Config{ ProcessorSettings: configmodels.ProcessorSettings{ TypeVal: "resource", - NameVal: "resource/2", + NameVal: "resource", + }, + AttributesActions: []attraction.ActionKeyValue{ + {Key: "cloud.zone", Value: "zone-1", Action: attraction.UPSERT}, + {Key: "k8s.cluster.name", FromAttribute: "k8s-cluster", Action: attraction.INSERT}, + {Key: "redundant-attribute", Action: attraction.DELETE}, }, - ResourceType: "host", - Labels: map[string]string{ - "cloud.zone": "zone-1", - "k8s.cluster.name": "k8s-cluster", - "host.name": "k8s-node", + }) + + assert.Equal(t, cfg.Processors["resource/invalid"], &Config{ + ProcessorSettings: configmodels.ProcessorSettings{ + TypeVal: "resource", + NameVal: "resource/invalid", }, }) } diff --git a/processor/resourceprocessor/factory.go b/processor/resourceprocessor/factory.go index f4275b692de..f428029b45c 100644 --- a/processor/resourceprocessor/factory.go +++ b/processor/resourceprocessor/factory.go @@ -15,11 +15,16 @@ package resourceprocessor import ( + "context" + "fmt" + "go.uber.org/zap" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configmodels" "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/internal/processor/attraction" + "go.opentelemetry.io/collector/translator/conventions" ) const ( @@ -32,30 +37,83 @@ type Factory struct { } // Type gets the type of the Option config created by this factory. -func (Factory) Type() configmodels.Type { +func (*Factory) Type() configmodels.Type { return typeStr } // CreateDefaultConfig creates the default configuration for processor. -func (Factory) CreateDefaultConfig() configmodels.Processor { +// Note: This isn't a valid configuration because the processor would do no work. +func (*Factory) CreateDefaultConfig() configmodels.Processor { return &Config{ ProcessorSettings: configmodels.ProcessorSettings{ TypeVal: typeStr, NameVal: typeStr, }, - ResourceType: "", - Labels: map[string]string{}, } } // CreateTraceProcessor creates a trace processor based on this config. -func (Factory) CreateTraceProcessor(logger *zap.Logger, nextConsumer consumer.TraceConsumerOld, cfg configmodels.Processor) (component.TraceProcessorOld, error) { - oCfg := cfg.(*Config) - return newResourceTraceProcessor(nextConsumer, oCfg), nil +func (*Factory) CreateTraceProcessor( + ctx context.Context, + params component.ProcessorCreateParams, + nextConsumer consumer.TraceConsumer, + cfg configmodels.Processor, +) (component.TraceProcessor, error) { + attrProc, err := createAttrProcessor(cfg.(*Config), params.Logger) + if err != nil { + return nil, err + } + return newResourceTraceProcessor(nextConsumer, attrProc), nil } // CreateMetricsProcessor creates a metrics processor based on this config. -func (Factory) CreateMetricsProcessor(logger *zap.Logger, nextConsumer consumer.MetricsConsumerOld, cfg configmodels.Processor) (component.MetricsProcessorOld, error) { - oCfg := cfg.(*Config) - return newResourceMetricProcessor(nextConsumer, oCfg), nil +func (*Factory) CreateMetricsProcessor( + ctx context.Context, + params component.ProcessorCreateParams, + nextConsumer consumer.MetricsConsumer, + cfg configmodels.Processor, +) (component.MetricsProcessor, error) { + attrProc, err := createAttrProcessor(cfg.(*Config), params.Logger) + if err != nil { + return nil, err + } + return newResourceMetricProcessor(nextConsumer, attrProc), nil +} + +func createAttrProcessor(cfg *Config, logger *zap.Logger) (*attraction.AttrProc, error) { + handleDeprecatedFields(cfg, logger) + if len(cfg.AttributesActions) == 0 { + return nil, fmt.Errorf("error creating \"%q\" processor due to missing required field \"attributes\"", cfg.Name()) + } + attrProc, err := attraction.NewAttrProc(&attraction.Settings{Actions: cfg.AttributesActions}) + if err != nil { + return nil, fmt.Errorf("error creating \"%q\" processor: %w", cfg.Name(), err) + } + return attrProc, nil +} + +// handleDeprecatedFields converts deprecated ResourceType and Labels fields into Attributes.Upsert +func handleDeprecatedFields(cfg *Config, logger *zap.Logger) { + + // Upsert value from deprecated ResourceType config to resource attributes with "opencensus.type" key + if cfg.ResourceType != "" { + logger.Warn("[DEPRECATED] \"type\" field is deprecated and will be removed in future release. " + + "Please set the value to \"attributes\" with key=opencensus.type and action=upsert.") + upsertResourceType := attraction.ActionKeyValue{ + Action: attraction.UPSERT, + Key: conventions.OCAttributeResourceType, + Value: cfg.ResourceType, + } + cfg.AttributesActions = append(cfg.AttributesActions, upsertResourceType) + } + + // Upsert values from deprecated Labels config to resource attributes + if len(cfg.Labels) > 0 { + logger.Warn("[DEPRECATED] \"labels\" field is deprecated and will be removed in future release. " + + "Please use \"attributes\" field instead.") + for k, v := range cfg.Labels { + action := attraction.ActionKeyValue{Action: attraction.UPSERT, Key: k, Value: v} + cfg.AttributesActions = append(cfg.AttributesActions, action) + } + } } diff --git a/processor/resourceprocessor/factory_test.go b/processor/resourceprocessor/factory_test.go index d038b861093..cb7927e2727 100644 --- a/processor/resourceprocessor/factory_test.go +++ b/processor/resourceprocessor/factory_test.go @@ -15,12 +15,16 @@ package resourceprocessor import ( + "context" "testing" "github.com/stretchr/testify/assert" "go.uber.org/zap" + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configcheck" + "go.opentelemetry.io/collector/config/configmodels" + "go.opentelemetry.io/collector/internal/processor/attraction" ) func TestCreateDefaultConfig(t *testing.T) { @@ -32,13 +36,81 @@ func TestCreateDefaultConfig(t *testing.T) { func TestCreateProcessor(t *testing.T) { var factory Factory - cfg := factory.CreateDefaultConfig() + cfg := &Config{ + ProcessorSettings: configmodels.ProcessorSettings{ + TypeVal: "resource", + NameVal: "resource", + }, + AttributesActions: []attraction.ActionKeyValue{ + {Key: "cloud.zone", Value: "zone-1", Action: attraction.UPSERT}, + }, + } - tp, err := factory.CreateTraceProcessor(zap.NewNop(), nil, cfg) + tp, err := factory.CreateTraceProcessor(context.Background(), component.ProcessorCreateParams{}, nil, cfg) assert.NoError(t, err) assert.NotNil(t, tp) - mp, err := factory.CreateMetricsProcessor(zap.NewNop(), nil, cfg) + mp, err := factory.CreateMetricsProcessor(context.Background(), component.ProcessorCreateParams{}, nil, cfg) assert.NoError(t, err) assert.NotNil(t, mp) } + +func TestInvalidEmptyActions(t *testing.T) { + var factory Factory + cfg := factory.CreateDefaultConfig() + + _, err := factory.CreateTraceProcessor(context.Background(), component.ProcessorCreateParams{}, nil, cfg) + assert.Error(t, err) + + _, err = factory.CreateMetricsProcessor(context.Background(), component.ProcessorCreateParams{}, nil, cfg) + assert.Error(t, err) +} + +func TestInvalidAttributeActions(t *testing.T) { + var factory Factory + cfg := &Config{ + ProcessorSettings: configmodels.ProcessorSettings{ + TypeVal: "resource", + NameVal: "resource", + }, + AttributesActions: []attraction.ActionKeyValue{ + {Key: "k", Value: "v", Action: "invalid-action"}, + }, + } + + _, err := factory.CreateTraceProcessor(context.Background(), component.ProcessorCreateParams{}, nil, cfg) + assert.Error(t, err) + + _, err = factory.CreateMetricsProcessor(context.Background(), component.ProcessorCreateParams{}, nil, cfg) + assert.Error(t, err) +} + +func TestDeprecatedConfig(t *testing.T) { + cfg := &Config{ + ProcessorSettings: configmodels.ProcessorSettings{ + TypeVal: "resource", + NameVal: "resource", + }, + ResourceType: "host", + Labels: map[string]string{ + "cloud.zone": "zone-1", + }, + } + + handleDeprecatedFields(cfg, zap.NewNop()) + + assert.EqualValues(t, &Config{ + ProcessorSettings: configmodels.ProcessorSettings{ + TypeVal: "resource", + NameVal: "resource", + }, + ResourceType: "host", + Labels: map[string]string{ + "cloud.zone": "zone-1", + }, + AttributesActions: []attraction.ActionKeyValue{ + {Key: "opencensus.resourcetype", Value: "host", Action: attraction.UPSERT}, + {Key: "cloud.zone", Value: "zone-1", Action: attraction.UPSERT}, + }, + }, cfg) +} diff --git a/processor/resourceprocessor/resource_processor.go b/processor/resourceprocessor/resource_processor.go index 8faecb9d744..b1e5c3cee01 100644 --- a/processor/resourceprocessor/resource_processor.go +++ b/processor/resourceprocessor/resource_processor.go @@ -17,41 +17,42 @@ package resourceprocessor import ( "context" - resourcepb "github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1" - "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" - "go.opentelemetry.io/collector/consumer/consumerdata" + "go.opentelemetry.io/collector/consumer/pdata" + "go.opentelemetry.io/collector/consumer/pdatautil" + "go.opentelemetry.io/collector/internal/processor/attraction" ) type resourceTraceProcessor struct { - resource *resourcepb.Resource - capabilities component.ProcessorCapabilities - next consumer.TraceConsumerOld + attrProc *attraction.AttrProc + next consumer.TraceConsumer } -func newResourceTraceProcessor(next consumer.TraceConsumerOld, cfg *Config) *resourceTraceProcessor { - resource := createResource(cfg) +func newResourceTraceProcessor(next consumer.TraceConsumer, attrProc *attraction.AttrProc) *resourceTraceProcessor { return &resourceTraceProcessor{ - next: next, - capabilities: component.ProcessorCapabilities{MutatesConsumedData: !isEmptyResource(resource)}, - resource: resource, + attrProc: attrProc, + next: next, } } // ConsumeTraceData implements the TraceProcessor interface -func (rtp *resourceTraceProcessor) ConsumeTraceData(ctx context.Context, td consumerdata.TraceData) error { - return rtp.next.ConsumeTraceData(ctx, consumerdata.TraceData{ - Node: td.Node, - Resource: mergeResource(td.Resource, rtp.resource), - Spans: td.Spans, - SourceFormat: td.SourceFormat, - }) +func (rtp *resourceTraceProcessor) ConsumeTraces(ctx context.Context, td pdata.Traces) error { + rss := td.ResourceSpans() + for i := 0; i < rss.Len(); i++ { + resource := rss.At(i).Resource() + if resource.IsNil() { + resource.InitEmpty() + } + attrs := resource.Attributes() + rtp.attrProc.Process(attrs) + } + return rtp.next.ConsumeTraces(ctx, td) } // GetCapabilities returns the ProcessorCapabilities assocciated with the resource processor. func (rtp *resourceTraceProcessor) GetCapabilities() component.ProcessorCapabilities { - return rtp.capabilities + return component.ProcessorCapabilities{MutatesConsumedData: true} } // Start is invoked during service startup. @@ -65,23 +66,20 @@ func (*resourceTraceProcessor) Shutdown(context.Context) error { } type resourceMetricProcessor struct { - resource *resourcepb.Resource - capabilities component.ProcessorCapabilities - next consumer.MetricsConsumerOld + attrProc *attraction.AttrProc + next consumer.MetricsConsumer } -func newResourceMetricProcessor(next consumer.MetricsConsumerOld, cfg *Config) *resourceMetricProcessor { - resource := createResource(cfg) +func newResourceMetricProcessor(next consumer.MetricsConsumer, attrProc *attraction.AttrProc) *resourceMetricProcessor { return &resourceMetricProcessor{ - resource: resource, - capabilities: component.ProcessorCapabilities{MutatesConsumedData: !isEmptyResource(resource)}, - next: next, + attrProc: attrProc, + next: next, } } // GetCapabilities returns the ProcessorCapabilities assocciated with the resource processor. func (rmp *resourceMetricProcessor) GetCapabilities() component.ProcessorCapabilities { - return rmp.capabilities + return component.ProcessorCapabilities{MutatesConsumedData: true} } // Start is invoked during service startup. @@ -95,52 +93,18 @@ func (*resourceMetricProcessor) Shutdown(context.Context) error { } // ConsumeMetricsData implements the MetricsProcessor interface -func (rmp *resourceMetricProcessor) ConsumeMetricsData(ctx context.Context, md consumerdata.MetricsData) error { - return rmp.next.ConsumeMetricsData(ctx, consumerdata.MetricsData{ - Node: md.Node, - Resource: mergeResource(md.Resource, rmp.resource), - Metrics: md.Metrics, - }) -} - -func createResource(cfg *Config) *resourcepb.Resource { - rpb := &resourcepb.Resource{ - Type: cfg.ResourceType, - Labels: map[string]string{}, - } - for k, v := range cfg.Labels { - rpb.Labels[k] = v - } - return rpb -} - -func mergeResource(to, from *resourcepb.Resource) *resourcepb.Resource { - if isEmptyResource(from) { - return to - } - if to == nil { - if from.Type == "" { - // Since resource without type would be invalid, we keep resource as nil - return nil - } - to = &resourcepb.Resource{Labels: map[string]string{}} - } - if from.Type != "" { - // Only change resource type if it was configured - to.Type = from.Type - } - if from.Labels != nil { - if to.Labels == nil { - to.Labels = make(map[string]string, len(from.Labels)) +func (rmp *resourceMetricProcessor) ConsumeMetrics(ctx context.Context, md pdata.Metrics) error { + imd := pdatautil.MetricsToInternalMetrics(md) + rms := imd.ResourceMetrics() + for i := 0; i < rms.Len(); i++ { + resource := rms.At(i).Resource() + if resource.IsNil() { + resource.InitEmpty() } - - for k, v := range from.Labels { - to.Labels[k] = v + if resource.Attributes().Len() == 0 { + resource.Attributes().InitEmptyWithCapacity(1) } + rmp.attrProc.Process(resource.Attributes()) } - return to -} - -func isEmptyResource(resource *resourcepb.Resource) bool { - return resource.Type == "" && len(resource.Labels) == 0 + return rmp.next.ConsumeMetrics(ctx, md) } diff --git a/processor/resourceprocessor/resource_processor_test.go b/processor/resourceprocessor/resource_processor_test.go index 0dbf8ffb8c6..4bfc08aeb7a 100644 --- a/processor/resourceprocessor/resource_processor_test.go +++ b/processor/resourceprocessor/resource_processor_test.go @@ -18,269 +18,175 @@ import ( "context" "testing" - resourcepb "github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/config/configmodels" - "go.opentelemetry.io/collector/consumer/consumerdata" + "go.opentelemetry.io/collector/consumer/pdata" + "go.opentelemetry.io/collector/consumer/pdatautil" + "go.opentelemetry.io/collector/internal/data/testdata" + "go.opentelemetry.io/collector/internal/processor/attraction" ) var ( - cfg = &Config{ - ProcessorSettings: configmodels.ProcessorSettings{ - TypeVal: "resource", - NameVal: "resource", - }, - ResourceType: "host", - Labels: map[string]string{ - "cloud.zone": "zone-1", - "k8s.cluster.name": "k8s-cluster", - "host.name": "k8s-node", - }, - } - - emptyCfg = &Config{ - ProcessorSettings: configmodels.ProcessorSettings{ - TypeVal: "resource", - NameVal: "resource", - }, - ResourceType: "", - Labels: map[string]string{}, + processorSettings = configmodels.ProcessorSettings{ + TypeVal: "resource", + NameVal: "resource", } - cfgWithEmptyResourceType = &Config{ - ProcessorSettings: configmodels.ProcessorSettings{ - TypeVal: "resource", - NameVal: "resource", - }, - ResourceType: "", - Labels: map[string]string{ - "cloud.zone": "zone-1", - "k8s.cluster.name": "k8s-cluster", - "host.name": "k8s-node", - }, - } - - resource = &resourcepb.Resource{ - Type: "host", - Labels: map[string]string{ - "cloud.zone": "zone-1", - "k8s.cluster.name": "k8s-cluster", - "host.name": "k8s-node", - }, - } - - resource2 = &resourcepb.Resource{ - Type: "ht", - Labels: map[string]string{ - "zone": "zone-2", - "cluster": "cluster-2", - "host": "node-2", - }, - } - - mergedResource = &resourcepb.Resource{ - Type: "host", - Labels: map[string]string{ - "cloud.zone": "zone-1", - "k8s.cluster.name": "k8s-cluster", - "host.name": "k8s-node", - "zone": "zone-2", - "cluster": "cluster-2", - "host": "node-2", + cfg = &Config{ + ProcessorSettings: processorSettings, + AttributesActions: []attraction.ActionKeyValue{ + {Key: "cloud.zone", Value: "zone-1", Action: attraction.UPSERT}, + {Key: "k8s.cluster.name", FromAttribute: "k8s-cluster", Action: attraction.INSERT}, + {Key: "redundant-attribute", Action: attraction.DELETE}, }, } ) -func TestResourceProcessor(t *testing.T) { +func TestResourceProcessorAttributesUpsert(t *testing.T) { tests := []struct { - name string - config *Config - mutatesConsumedData bool - sourceResource *resourcepb.Resource - wantResource *resourcepb.Resource + name string + config *Config + sourceAttributes map[string]string + wantAttributes map[string]string }{ { - name: "Config with empty resource type doesn't mutate resource type", - config: cfgWithEmptyResourceType, - mutatesConsumedData: true, - sourceResource: &resourcepb.Resource{ - Type: "original-type", - Labels: map[string]string{ - "original-label": "original-value", - "cloud.zone": "will-be-overridden", - }, - }, - wantResource: &resourcepb.Resource{ - Type: "original-type", - Labels: map[string]string{ - "original-label": "original-value", - "cloud.zone": "zone-1", - "k8s.cluster.name": "k8s-cluster", - "host.name": "k8s-node", - }, + name: "config_with_attributes_applied_on_nil_resource", + config: cfg, + sourceAttributes: nil, + wantAttributes: map[string]string{ + "cloud.zone": "zone-1", }, }, { - name: "Config with empty resource type keeps nil resource", - config: cfgWithEmptyResourceType, - mutatesConsumedData: true, - sourceResource: nil, - wantResource: nil, + name: "config_with_attributes_applied_on_empty_resource", + config: cfg, + sourceAttributes: map[string]string{}, + wantAttributes: map[string]string{ + "cloud.zone": "zone-1", + }, }, { - name: "Consumed resource with nil labels", - config: cfgWithEmptyResourceType, - mutatesConsumedData: true, - sourceResource: &resourcepb.Resource{ - Type: "original-type", + name: "config_attributes_applied_on_existing_resource_attributes", + config: cfg, + sourceAttributes: map[string]string{ + "cloud.zone": "to-be-replaced", + "k8s-cluster": "test-cluster", + "redundant-attribute": "to-be-removed", + }, + wantAttributes: map[string]string{ + "cloud.zone": "zone-1", + "k8s-cluster": "test-cluster", + "k8s.cluster.name": "test-cluster", }, - wantResource: &resourcepb.Resource{ - Type: "original-type", - Labels: map[string]string{ - "cloud.zone": "zone-1", - "k8s.cluster.name": "k8s-cluster", - "host.name": "k8s-node", + }, + { + name: "config_attributes_replacement", + config: &Config{ + ProcessorSettings: processorSettings, + AttributesActions: []attraction.ActionKeyValue{ + {Key: "k8s.cluster.name", FromAttribute: "k8s-cluster", Action: attraction.INSERT}, + {Key: "k8s-cluster", Action: attraction.DELETE}, }, }, + sourceAttributes: map[string]string{ + "k8s-cluster": "test-cluster", + }, + wantAttributes: map[string]string{ + "k8s.cluster.name": "test-cluster", + }, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - // Test trace consuner + // Test trace consumer ttn := &testTraceConsumer{} - rtp := newResourceTraceProcessor(ttn, tt.config) - assert.Equal(t, tt.mutatesConsumedData, rtp.GetCapabilities().MutatesConsumedData) + attrProc, err := attraction.NewAttrProc(&attraction.Settings{Actions: tt.config.AttributesActions}) + require.NoError(t, err) + + rtp := newResourceTraceProcessor(ttn, attrProc) + assert.Equal(t, true, rtp.GetCapabilities().MutatesConsumedData) - err := rtp.ConsumeTraceData(context.Background(), consumerdata.TraceData{ - Resource: tt.sourceResource, - }) + sourceTraceData := generateTraceData(tt.sourceAttributes) + wantTraceData := generateTraceData(tt.wantAttributes) + err = rtp.ConsumeTraces(context.Background(), sourceTraceData) require.NoError(t, err) - assert.Equal(t, tt.wantResource, ttn.td.Resource) + assert.EqualValues(t, wantTraceData, ttn.td) // Test metrics consumer tmn := &testMetricsConsumer{} - rmp := newResourceMetricProcessor(tmn, tt.config) - assert.Equal(t, tt.mutatesConsumedData, rmp.GetCapabilities().MutatesConsumedData) + rmp := newResourceMetricProcessor(tmn, attrProc) + assert.Equal(t, true, rtp.GetCapabilities().MutatesConsumedData) - err = rmp.ConsumeMetricsData(context.Background(), consumerdata.MetricsData{ - Resource: tt.sourceResource, - }) + sourceMetricData := generateMetricData(tt.sourceAttributes) + wantMetricData := generateMetricData(tt.wantAttributes) + err = rmp.ConsumeMetrics(context.Background(), sourceMetricData) require.NoError(t, err) - assert.Equal(t, tt.wantResource, tmn.md.Resource) + assert.EqualValues(t, wantMetricData, tmn.md) }) } } -func TestTraceResourceProcessor(t *testing.T) { - want := consumerdata.TraceData{ - Resource: resource, - } - test := consumerdata.TraceData{} - - ttn := &testTraceConsumer{} - rtp := newResourceTraceProcessor(ttn, cfg) - assert.True(t, rtp.GetCapabilities().MutatesConsumedData) - - rtp.ConsumeTraceData(context.Background(), test) - assert.Equal(t, ttn.td, want) -} - -func TestTraceResourceProcessorEmpty(t *testing.T) { - want := consumerdata.TraceData{ - Resource: resource2, +func generateTraceData(attributes map[string]string) pdata.Traces { + td := testdata.GenerateTraceDataOneSpanNoResource() + if attributes == nil { + return td } - test := consumerdata.TraceData{ - Resource: resource2, + resource := td.ResourceSpans().At(0).Resource() + resource.InitEmpty() + for k, v := range attributes { + resource.Attributes().InsertString(k, v) } - - ttn := &testTraceConsumer{} - rtp := newResourceTraceProcessor(ttn, emptyCfg) - assert.False(t, rtp.GetCapabilities().MutatesConsumedData) - - rtp.ConsumeTraceData(context.Background(), test) - assert.Equal(t, ttn.td, want) + resource.Attributes().Sort() + return td } -func TestTraceResourceProcessorNonEmptyIncomingResource(t *testing.T) { - want := consumerdata.TraceData{ - Resource: mergedResource, +func generateMetricData(attributes map[string]string) pdata.Metrics { + md := testdata.GenerateMetricDataOneMetricNoResource() + if attributes == nil { + return pdatautil.MetricsFromInternalMetrics(md) } - test := consumerdata.TraceData{ - Resource: resource2, + resource := md.ResourceMetrics().At(0).Resource() + resource.InitEmpty() + for k, v := range attributes { + resource.Attributes().InsertString(k, v) } - ttn := &testTraceConsumer{} - rtp := newResourceTraceProcessor(ttn, cfg) - rtp.ConsumeTraceData(context.Background(), test) - assert.Equal(t, ttn.td, want) -} - -func TestMetricResourceProcessor(t *testing.T) { - want := consumerdata.MetricsData{ - Resource: resource, - } - test := consumerdata.MetricsData{} - - tmn := &testMetricsConsumer{} - rmp := newResourceMetricProcessor(tmn, cfg) - assert.True(t, rmp.GetCapabilities().MutatesConsumedData) - - rmp.ConsumeMetricsData(context.Background(), test) - assert.Equal(t, tmn.md, want) -} - -func TestMetricResourceProcessorEmpty(t *testing.T) { - want := consumerdata.MetricsData{ - Resource: resource2, - } - test := consumerdata.MetricsData{ - Resource: resource2, - } - - tmn := &testMetricsConsumer{} - rmp := newResourceMetricProcessor(tmn, emptyCfg) - assert.False(t, rmp.GetCapabilities().MutatesConsumedData) - - rmp.ConsumeMetricsData(context.Background(), test) - assert.Equal(t, tmn.md, want) -} - -func TestMetricResourceProcessorNonEmptyIncomingResource(t *testing.T) { - want := consumerdata.MetricsData{ - Resource: mergedResource, - } - test := consumerdata.MetricsData{ - Resource: resource2, - } - - tmn := &testMetricsConsumer{} - rmp := newResourceMetricProcessor(tmn, cfg) - rmp.ConsumeMetricsData(context.Background(), test) - assert.Equal(t, tmn.md, want) -} - -func TestMergeResourceWithNilLabels(t *testing.T) { - resourceNilLabels := &resourcepb.Resource{Type: "host"} - assert.Nil(t, resourceNilLabels.Labels) - assert.Equal(t, mergeResource(nil, resourceNilLabels), &resourcepb.Resource{Type: "host", Labels: map[string]string{}}) + resource.Attributes().Sort() + return pdatautil.MetricsFromInternalMetrics(md) } type testTraceConsumer struct { - td consumerdata.TraceData + td pdata.Traces } -func (ttn *testTraceConsumer) ConsumeTraceData(ctx context.Context, td consumerdata.TraceData) error { +func (ttn *testTraceConsumer) ConsumeTraces(ctx context.Context, td pdata.Traces) error { + // sort attributes to be able to compare traces + for i := 0; i < td.ResourceSpans().Len(); i++ { + sortResourceAttributes(td.ResourceSpans().At(i).Resource()) + } ttn.td = td return nil } type testMetricsConsumer struct { - md consumerdata.MetricsData + md pdata.Metrics } -func (tmn *testMetricsConsumer) ConsumeMetricsData(ctx context.Context, md consumerdata.MetricsData) error { +func (tmn *testMetricsConsumer) ConsumeMetrics(ctx context.Context, md pdata.Metrics) error { + // sort attributes to be able to compare traces + imd := pdatautil.MetricsToInternalMetrics(md) + for i := 0; i < imd.ResourceMetrics().Len(); i++ { + sortResourceAttributes(imd.ResourceMetrics().At(i).Resource()) + } tmn.md = md return nil } + +func sortResourceAttributes(resource pdata.Resource) { + if resource.IsNil() { + return + } + resource.Attributes().Sort() +} diff --git a/processor/resourceprocessor/testdata/config.yaml b/processor/resourceprocessor/testdata/config.yaml index b770ca2be15..ac5ebc24b4e 100644 --- a/processor/resourceprocessor/testdata/config.yaml +++ b/processor/resourceprocessor/testdata/config.yaml @@ -2,18 +2,25 @@ receivers: examplereceiver: processors: - # The following specifies an empty resource - it will have no effect on trace or metrics data. + # The following specifies a resource configuration doing the changes on resource attributes: + # 1. Set "cloud.zone" attributes with "zone-1" value ignoring existing values. + # 2. Copy "k8s-cluster" attribute value to "k8s.cluster.name" attribute, nothing happens if "k8s-cluster" not found. + # 3. Remove "redundant-attribute" attribute. + # There are many more attribute modification actions supported, + # check processor/attributesprocessor/testdata/config.yaml for reference. resource: - # The following specifies a non-trivial resource. Type "host" is used for Kubernetes node resources - # that expect the labels "cloud.zone", "k8s.cluster.name", "host.name" to be defined (although this - # is not enforced by the configuration logic). - resource/2: - type: "host" - labels: { - "cloud.zone": "zone-1", - "k8s.cluster.name": "k8s-cluster", - "host.name": "k8s-node", - } + attributes: + - key: cloud.zone + value: zone-1 + action: upsert + - key: k8s.cluster.name + from_attribute: k8s-cluster + action: insert + - key: redundant-attribute + action: delete + # The following specifies an invalid resource configuration, it has to have at least one action set in attributes field. + resource/invalid: + exporters: exampleexporter: @@ -22,5 +29,9 @@ service: pipelines: metrics: receivers: [examplereceiver] - processors: [resource/2] + processors: [resource] + exporters: [exampleexporter] + traces: + receivers: [examplereceiver] + processors: [resource] exporters: [exampleexporter] diff --git a/testbed/tests/resource_processor_test.go b/testbed/tests/resource_processor_test.go index ad1958c8ac6..dcadefda993 100644 --- a/testbed/tests/resource_processor_test.go +++ b/testbed/tests/resource_processor_test.go @@ -67,38 +67,6 @@ const ( } ` - mockedConsumedResourceWithoutTypeJSON = ` - { - "resource": { - "attributes": [ - { - "key": "label-key", - "value": { "stringValue": "label-value" } - } - ] - }, - "instrumentation_library_metrics": [ - { - "metrics": [ - { - "metric_descriptor": { - "name": "metric-name", - "description": "metric-description", - "unit": "metric-unit", - "type": 1 - }, - "int64_data_points": [ - { - "value": 0 - } - ] - } - ] - } - ] - } -` - mockedConsumedResourceNilJSON = ` { "instrumentation_library_metrics": [ @@ -153,76 +121,56 @@ type resourceProcessorTestCase struct { resourceProcessorConfig string mockedConsumedMetricData data.MetricData expectedMetricData data.MetricData - isNilResource bool } func getResourceProcessorTestCases(t *testing.T) []resourceProcessorTestCase { tests := []resourceProcessorTestCase{ { - name: "Override consumed resource labels and type", + name: "update_and_rename_existing_attributes", resourceProcessorConfig: ` resource: - type: vm - labels: { - "additional-label-key": "additional-label-value", - } + attributes: + - key: label-key + value: new-label-value + action: update + - key: resource-type + from_attribute: opencensus.resourcetype + action: upsert + - key: opencensus.resourcetype + action: delete `, mockedConsumedMetricData: getMetricDataFromJSON(t, mockedConsumedResourceWithTypeJSON), expectedMetricData: getMetricDataFromResourceMetrics(&otlpmetrics.ResourceMetrics{ Resource: &otlpresource.Resource{ Attributes: []*otlpcommon.KeyValue{ { - Key: "opencensus.resourcetype", - Value: &otlpcommon.AnyValue{Value: &otlpcommon.AnyValue_StringValue{StringValue: "vm"}}, + Key: "resource-type", + Value: &otlpcommon.AnyValue{Value: &otlpcommon.AnyValue_StringValue{StringValue: "host"}}, }, { Key: "label-key", - Value: &otlpcommon.AnyValue{Value: &otlpcommon.AnyValue_StringValue{StringValue: "label-value"}}, - }, - { - Key: "additional-label-key", - Value: &otlpcommon.AnyValue{Value: &otlpcommon.AnyValue_StringValue{StringValue: "additional-label-value"}}, + Value: &otlpcommon.AnyValue{Value: &otlpcommon.AnyValue_StringValue{StringValue: "new-label-value"}}, }, }, }, }), }, { - name: "Return nil if consumed resource is nil and type is empty", - resourceProcessorConfig: ` - resource: - labels: { - "additional-label-key": "additional-label-value", - } -`, - mockedConsumedMetricData: getMetricDataFromJSON(t, mockedConsumedResourceNilJSON), - isNilResource: true, - }, - { - name: "Return nil if consumed resource and resource in config is nil", + name: "set_attribute_on_nil_resource", resourceProcessorConfig: ` resource: + attributes: + - key: additional-label-key + value: additional-label-value + action: insert + `, mockedConsumedMetricData: getMetricDataFromJSON(t, mockedConsumedResourceNilJSON), - isNilResource: true, - }, - { - name: "Return resource without type", - resourceProcessorConfig: ` - resource: - labels: { - "additional-label-key": "additional-label-value", - } -`, - mockedConsumedMetricData: getMetricDataFromJSON(t, mockedConsumedResourceWithoutTypeJSON), expectedMetricData: getMetricDataFromResourceMetrics(&otlpmetrics.ResourceMetrics{ + Resource: &otlpresource.Resource{ Attributes: []*otlpcommon.KeyValue{ - { - Key: "label-key", - Value: &otlpcommon.AnyValue{Value: &otlpcommon.AnyValue_StringValue{StringValue: "label-value"}}, - }, { Key: "additional-label-key", Value: &otlpcommon.AnyValue{Value: &otlpcommon.AnyValue_StringValue{StringValue: "additional-label-value"}}, @@ -232,12 +180,13 @@ func getResourceProcessorTestCases(t *testing.T) []resourceProcessorTestCase { }), }, { - name: "Consumed resource with nil labels", + name: "set_attribute_on_empty_resource", resourceProcessorConfig: ` resource: - labels: { - "additional-label-key": "additional-label-value", - } + attributes: + - key: additional-label-key + value: additional-label-value + action: insert `, mockedConsumedMetricData: getMetricDataFromJSON(t, mockedConsumedResourceWithoutAttributesJSON), expectedMetricData: getMetricDataFromResourceMetrics(&otlpmetrics.ResourceMetrics{ @@ -333,12 +282,6 @@ func TestMetricResourceProcessor(t *testing.T) { rm := pdatautil.MetricsToInternalMetrics(m).ResourceMetrics() require.Equal(t, 1, rm.Len()) - // If a resource is not expected to be returned by the processor, return. - if test.isNilResource { - require.True(t, rm.At(0).Resource().IsNil()) - return - } - require.Equal(t, attributesToMap(test.expectedMetricData.ResourceMetrics().At(0).Resource().Attributes()), attributesToMap(rm.At(0).Resource().Attributes()),