diff --git a/processor/resourcedetectionprocessor/README.md b/processor/resourcedetectionprocessor/README.md index a19bc1471f91..cf714dcf6881 100644 --- a/processor/resourcedetectionprocessor/README.md +++ b/processor/resourcedetectionprocessor/README.md @@ -615,7 +615,7 @@ resourcedetection: ## Ordering -Note that if multiple detectors are inserting the same attribute name, the first detector to insert wins. For example if you had `detectors: [eks, ec2]` then `cloud.platform` will be `aws_eks` instead of `ec2`. The below ordering is recommended. +By default, if multiple detectors are inserting the same attribute name, the first detector to insert wins. For example if you had `detectors: [eks, ec2]` then `cloud.platform` will be `aws_eks` instead of `ec2`. The below ordering is recommended. ### AWS @@ -627,3 +627,14 @@ Note that if multiple detectors are inserting the same attribute name, the first The full list of settings exposed for this extension are documented in [config.go](./config.go) with detailed sample configurations in [testdata/config.yaml](./testdata/config.yaml). + +**Note:** + +If you want to disable the ordering of the detectors and instead have a non-blocking resource detection in case of a detection failure, set the `keepOrder` parameter to `false`. For example: + +```yaml +processors: + resourcedetection: + detectors: [docker] + keepOrder: false +``` diff --git a/processor/resourcedetectionprocessor/config.go b/processor/resourcedetectionprocessor/config.go index 8685443bea18..ad9f0cd47c08 100644 --- a/processor/resourcedetectionprocessor/config.go +++ b/processor/resourcedetectionprocessor/config.go @@ -41,6 +41,9 @@ type Config struct { // If a supplied attribute is not a valid attribute of a supplied detector it will be ignored. // Deprecated: Please use detector's resource_attributes config instead Attributes []string `mapstructure:"attributes"` + // KeepOrder is a parameter which perserves the order of detectors and the detection of data + // This also introduces a blocking behavior, if one of the detectors cannot detect the resource + KeepOrder bool `mapstructure:"keepOrder"` } // DetectorConfig contains user-specified configurations unique to all individual detectors diff --git a/processor/resourcedetectionprocessor/factory.go b/processor/resourcedetectionprocessor/factory.go index 9f47c2ce69d4..2d4b94799713 100644 --- a/processor/resourcedetectionprocessor/factory.go +++ b/processor/resourcedetectionprocessor/factory.go @@ -96,6 +96,7 @@ func createDefaultConfig() component.Config { Override: true, Attributes: nil, DetectorConfig: detectorCreateDefaultConfig(), + KeepOrder: true, // TODO: Once issue(https://github.com/open-telemetry/opentelemetry-collector/issues/4001) gets resolved, // Set the default value of 'hostname_source' here instead of 'system' detector } @@ -199,7 +200,7 @@ func (f *factory) getResourceDetectionProcessor( if oCfg.Attributes != nil { params.Logger.Warn("You are using deprecated `attributes` option that will be removed soon; use `resource_attributes` instead, details on configuration: https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/processor/resourcedetectionprocessor#migration-from-attributes-to-resource_attributes") } - provider, err := f.getResourceProvider(params, oCfg.ClientConfig.Timeout, oCfg.Detectors, oCfg.DetectorConfig, oCfg.Attributes) + provider, err := f.getResourceProvider(params, oCfg.ClientConfig.Timeout, oCfg.Detectors, oCfg.DetectorConfig, oCfg.Attributes, oCfg.KeepOrder) if err != nil { return nil, err } @@ -218,6 +219,7 @@ func (f *factory) getResourceProvider( configuredDetectors []string, detectorConfigs DetectorConfig, attributes []string, + keepOrder bool, ) (*internal.ResourceProvider, error) { f.lock.Lock() defer f.lock.Unlock() @@ -231,7 +233,7 @@ func (f *factory) getResourceProvider( detectorTypes = append(detectorTypes, internal.DetectorType(strings.TrimSpace(key))) } - provider, err := f.resourceProviderFactory.CreateResourceProvider(params, timeout, attributes, &detectorConfigs, detectorTypes...) + provider, err := f.resourceProviderFactory.CreateResourceProvider(params, timeout, attributes, &detectorConfigs, keepOrder, detectorTypes...) if err != nil { return nil, err } diff --git a/processor/resourcedetectionprocessor/internal/resourcedetection.go b/processor/resourcedetectionprocessor/internal/resourcedetection.go index 05d1ca0aa3f0..da7735d3cb76 100644 --- a/processor/resourcedetectionprocessor/internal/resourcedetection.go +++ b/processor/resourcedetectionprocessor/internal/resourcedetection.go @@ -45,6 +45,7 @@ func (f *ResourceProviderFactory) CreateResourceProvider( timeout time.Duration, attributes []string, detectorConfigs ResourceDetectorConfig, + keepOrder bool, detectorTypes ...DetectorType, ) (*ResourceProvider, error) { detectors, err := f.getDetectors(params, detectorConfigs, detectorTypes) @@ -59,7 +60,7 @@ func (f *ResourceProviderFactory) CreateResourceProvider( } } - provider := NewResourceProvider(params.Logger, timeout, attributesToKeep, detectors...) + provider := NewResourceProvider(params.Logger, timeout, attributesToKeep, keepOrder, detectors...) return provider, nil } @@ -89,6 +90,7 @@ type ResourceProvider struct { detectedResource *resourceResult once sync.Once attributesToKeep map[string]struct{} + keepOrder bool } type resourceResult struct { @@ -97,12 +99,13 @@ type resourceResult struct { err error } -func NewResourceProvider(logger *zap.Logger, timeout time.Duration, attributesToKeep map[string]struct{}, detectors ...Detector) *ResourceProvider { +func NewResourceProvider(logger *zap.Logger, timeout time.Duration, attributesToKeep map[string]struct{}, keepOrder bool, detectors ...Detector) *ResourceProvider { return &ResourceProvider{ logger: logger, timeout: timeout, detectors: detectors, attributesToKeep: attributesToKeep, + keepOrder: keepOrder, } } @@ -114,6 +117,21 @@ func (p *ResourceProvider) Get(ctx context.Context, _ *http.Client) (resource pc return p.detectedResource.resource, p.detectedResource.schemaURL, p.detectedResource.err } +type detectResult struct { + r pcommon.Resource + schemaURL string + err error +} + +func handleResult(res *pcommon.Resource, resultsChan chan detectResult, mergedSchemaURL string) string { + result := <-resultsChan + if result.err == nil { + mergedSchemaURL = MergeSchemaURL(mergedSchemaURL, result.schemaURL) + MergeResource(*res, result.r, false) + } + return mergedSchemaURL +} + func (p *ResourceProvider) detectResource(ctx context.Context) { p.detectedResource = &resourceResult{} @@ -122,13 +140,7 @@ func (p *ResourceProvider) detectResource(ctx context.Context) { p.logger.Info("began detecting resource information") - type result struct { - r pcommon.Resource - schemaURL string - err error - } - - resultsChan := make(chan result, len(p.detectors)) + resultsChan := make(chan detectResult, len(p.detectors)) for _, detector := range p.detectors { go func(detector Detector) { sleep := 2 * time.Second @@ -139,18 +151,18 @@ func (p *ResourceProvider) detectResource(ctx context.Context) { time.Sleep(sleep) sleep *= 2 } else { - resultsChan <- result{r: r, schemaURL: schemaURL, err: nil} + resultsChan <- detectResult{r: r, schemaURL: schemaURL, err: nil} return } } }(detector) + if p.keepOrder { + mergedSchemaURL = handleResult(&res, resultsChan, mergedSchemaURL) + } } - - for range p.detectors { - result := <-resultsChan - if result.err == nil { - mergedSchemaURL = MergeSchemaURL(mergedSchemaURL, result.schemaURL) - MergeResource(res, result.r, false) + if !p.keepOrder { + for range p.detectors { + mergedSchemaURL = handleResult(&res, resultsChan, mergedSchemaURL) } } diff --git a/processor/resourcedetectionprocessor/internal/resourcedetection_test.go b/processor/resourcedetectionprocessor/internal/resourcedetection_test.go index 833ad856552e..4d1f84503d48 100644 --- a/processor/resourcedetectionprocessor/internal/resourcedetection_test.go +++ b/processor/resourcedetectionprocessor/internal/resourcedetection_test.go @@ -7,6 +7,8 @@ import ( "context" "errors" "fmt" + "net/http" + "sync" "testing" "time" @@ -16,6 +18,7 @@ import ( "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/processor" "go.opentelemetry.io/collector/processor/processortest" + "go.uber.org/zap" ) type MockDetector struct { @@ -33,86 +36,86 @@ func (d *mockDetectorConfig) GetConfigFromType(_ DetectorType) DetectorConfig { return nil } -// func TestDetect(t *testing.T) { -// tests := []struct { -// name string -// detectedResources []map[string]any -// expectedResource map[string]any -// attributes []string -// }{ -// { -// name: "Detect three resources", -// detectedResources: []map[string]any{ -// {"a": "1", "b": "2"}, -// {"a": "11", "c": "3"}, -// {"a": "12", "c": "3"}, -// }, -// expectedResource: map[string]any{"a": "1", "b": "2", "c": "3"}, -// attributes: nil, -// }, { -// name: "Detect empty resources", -// detectedResources: []map[string]any{ -// {"a": "1", "b": "2"}, -// {}, -// {"a": "11"}, -// }, -// expectedResource: map[string]any{"a": "1", "b": "2"}, -// attributes: nil, -// }, { -// name: "Detect non-string resources", -// detectedResources: []map[string]any{ -// {"bool": true, "int": int64(2), "double": 0.5}, -// {"bool": false}, -// {"a": "11"}, -// }, -// expectedResource: map[string]any{"a": "11", "bool": true, "int": int64(2), "double": 0.5}, -// attributes: nil, -// }, { -// name: "Filter to one attribute", -// detectedResources: []map[string]any{ -// {"a": "1", "b": "2"}, -// {"a": "11", "c": "3"}, -// {"a": "12", "c": "3"}, -// }, -// expectedResource: map[string]any{"a": "1"}, -// attributes: []string{"a"}, -// }, -// } - -// for _, tt := range tests { -// t.Run(tt.name, func(t *testing.T) { -// mockDetectors := make(map[DetectorType]DetectorFactory, len(tt.detectedResources)) -// mockDetectorTypes := make([]DetectorType, 0, len(tt.detectedResources)) - -// for i, resAttrs := range tt.detectedResources { -// md := &MockDetector{} -// res := pcommon.NewResource() -// require.NoError(t, res.Attributes().FromRaw(resAttrs)) -// md.On("Detect").Return(res, nil) - -// mockDetectorType := DetectorType(fmt.Sprintf("mockdetector%v", i)) -// mockDetectors[mockDetectorType] = func(processor.Settings, DetectorConfig) (Detector, error) { -// return md, nil -// } -// mockDetectorTypes = append(mockDetectorTypes, mockDetectorType) -// } - -// f := NewProviderFactory(mockDetectors) -// p, err := f.CreateResourceProvider(processortest.NewNopSettings(), time.Second, tt.attributes, &mockDetectorConfig{}, mockDetectorTypes...) -// require.NoError(t, err) - -// got, _, err := p.Get(context.Background(), http.DefaultClient) -// require.NoError(t, err) - -// assert.Equal(t, tt.expectedResource, got.Attributes().AsRaw()) -// }) -// } -// } +func TestDetect(t *testing.T) { + tests := []struct { + name string + detectedResources []map[string]any + expectedResource map[string]any + attributes []string + }{ + { + name: "Detect three resources", + detectedResources: []map[string]any{ + {"a": "1", "b": "2"}, + {"a": "11", "c": "3"}, + {"a": "12", "c": "3"}, + }, + expectedResource: map[string]any{"a": "1", "b": "2", "c": "3"}, + attributes: nil, + }, { + name: "Detect empty resources", + detectedResources: []map[string]any{ + {"a": "1", "b": "2"}, + {}, + {"a": "11"}, + }, + expectedResource: map[string]any{"a": "1", "b": "2"}, + attributes: nil, + }, { + name: "Detect non-string resources", + detectedResources: []map[string]any{ + {"bool": true, "int": int64(2), "double": 0.5}, + {"bool": false}, + {"a": "11"}, + }, + expectedResource: map[string]any{"a": "11", "bool": true, "int": int64(2), "double": 0.5}, + attributes: nil, + }, { + name: "Filter to one attribute", + detectedResources: []map[string]any{ + {"a": "1", "b": "2"}, + {"a": "11", "c": "3"}, + {"a": "12", "c": "3"}, + }, + expectedResource: map[string]any{"a": "1"}, + attributes: []string{"a"}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + mockDetectors := make(map[DetectorType]DetectorFactory, len(tt.detectedResources)) + mockDetectorTypes := make([]DetectorType, 0, len(tt.detectedResources)) + + for i, resAttrs := range tt.detectedResources { + md := &MockDetector{} + res := pcommon.NewResource() + require.NoError(t, res.Attributes().FromRaw(resAttrs)) + md.On("Detect").Return(res, nil) + + mockDetectorType := DetectorType(fmt.Sprintf("mockdetector%v", i)) + mockDetectors[mockDetectorType] = func(processor.Settings, DetectorConfig) (Detector, error) { + return md, nil + } + mockDetectorTypes = append(mockDetectorTypes, mockDetectorType) + } + + f := NewProviderFactory(mockDetectors) + p, err := f.CreateResourceProvider(processortest.NewNopSettings(), time.Second, tt.attributes, &mockDetectorConfig{}, true, mockDetectorTypes...) + require.NoError(t, err) + + got, _, err := p.Get(context.Background(), http.DefaultClient) + require.NoError(t, err) + + assert.Equal(t, tt.expectedResource, got.Attributes().AsRaw()) + }) + } +} func TestDetectResource_InvalidDetectorType(t *testing.T) { mockDetectorKey := DetectorType("mock") p := NewProviderFactory(map[DetectorType]DetectorFactory{}) - _, err := p.CreateResourceProvider(processortest.NewNopSettings(), time.Second, nil, &mockDetectorConfig{}, mockDetectorKey) + _, err := p.CreateResourceProvider(processortest.NewNopSettings(), time.Second, nil, &mockDetectorConfig{}, true, mockDetectorKey) require.EqualError(t, err, fmt.Sprintf("invalid detector key: %v", mockDetectorKey)) } @@ -123,24 +126,10 @@ func TestDetectResource_DetectorFactoryError(t *testing.T) { return nil, errors.New("creation failed") }, }) - _, err := p.CreateResourceProvider(processortest.NewNopSettings(), time.Second, nil, &mockDetectorConfig{}, mockDetectorKey) + _, err := p.CreateResourceProvider(processortest.NewNopSettings(), time.Second, nil, &mockDetectorConfig{}, true, mockDetectorKey) require.EqualError(t, err, fmt.Sprintf("failed creating detector type %q: %v", mockDetectorKey, "creation failed")) } -// func TestDetectResource_Error(t *testing.T) { -// md1 := &MockDetector{} -// res := pcommon.NewResource() -// require.NoError(t, res.Attributes().FromRaw(map[string]any{"a": "1", "b": "2"})) -// md1.On("Detect").Return(res, nil) - -// md2 := &MockDetector{} -// md2.On("Detect").Return(pcommon.NewResource(), errors.New("err1")) - -// p := NewResourceProvider(zap.NewNop(), time.Second, nil, md1, md2) -// _, _, err := p.Get(context.Background(), http.DefaultClient) -// require.NoError(t, err) -// } - func TestMergeResource(t *testing.T) { for _, tt := range []struct { name string @@ -191,52 +180,47 @@ func (p *MockParallelDetector) Detect(_ context.Context) (pcommon.Resource, stri // TestDetectResource_Parallel validates that Detect is only called once, even if there // are multiple calls to ResourceProvider.Get -// func TestDetectResource_Parallel(t *testing.T) { -// const iterations = 5 - -// md1 := NewMockParallelDetector() -// res1 := pcommon.NewResource() -// require.NoError(t, res1.Attributes().FromRaw(map[string]any{"a": "1", "b": "2"})) -// md1.On("Detect").Return(res1, nil) - -// md2 := NewMockParallelDetector() -// res2 := pcommon.NewResource() -// require.NoError(t, res2.Attributes().FromRaw(map[string]any{"a": "11", "c": "3"})) -// md2.On("Detect").Return(res2, nil) - -// md3 := NewMockParallelDetector() -// md3.On("Detect").Return(pcommon.NewResource(), errors.New("an error")) - -// expectedResourceAttrs := map[string]any{"a": "1", "b": "2", "c": "3"} - -// p := NewResourceProvider(zap.NewNop(), time.Second, nil, md1, md2, md3) - -// // call p.Get multiple times -// wg := &sync.WaitGroup{} -// wg.Add(iterations) -// for i := 0; i < iterations; i++ { -// go func() { -// defer wg.Done() -// detected, _, err := p.Get(context.Background(), http.DefaultClient) -// assert.NoError(t, err) -// assert.Equal(t, expectedResourceAttrs, detected.Attributes().AsRaw()) -// }() -// } - -// // wait until all goroutines are blocked -// time.Sleep(5 * time.Millisecond) - -// // detector.Detect should only be called once, so we only need to notify each channel once -// md1.ch <- struct{}{} -// md2.ch <- struct{}{} -// md3.ch <- struct{}{} - -// // then wait until all goroutines are finished, and ensure p.Detect was only called once -// wg.Wait() -// md1.AssertNumberOfCalls(t, "Detect", 1) -// md2.AssertNumberOfCalls(t, "Detect", 1) -// md3.AssertNumberOfCalls(t, "Detect", 1) -// } +func TestDetectResource_Parallel(t *testing.T) { + const iterations = 5 + + md1 := NewMockParallelDetector() + res1 := pcommon.NewResource() + require.NoError(t, res1.Attributes().FromRaw(map[string]any{"a": "1", "b": "2"})) + md1.On("Detect").Return(res1, nil) + + md2 := NewMockParallelDetector() + res2 := pcommon.NewResource() + require.NoError(t, res2.Attributes().FromRaw(map[string]any{"a": "11", "c": "3"})) + md2.On("Detect").Return(res2, nil) + + expectedResourceAttrs := map[string]any{"a": "1", "b": "2", "c": "3"} + + p := NewResourceProvider(zap.NewNop(), time.Second, nil, true, md1, md2) + + // call p.Get multiple times + wg := &sync.WaitGroup{} + wg.Add(iterations) + for i := 0; i < iterations; i++ { + go func() { + defer wg.Done() + detected, _, err := p.Get(context.Background(), http.DefaultClient) + assert.NoError(t, err) + assert.Equal(t, expectedResourceAttrs, detected.Attributes().AsRaw()) + }() + } + + // wait until all goroutines are blocked + time.Sleep(5 * time.Millisecond) + + // detector.Detect should only be called once, so we only need to notify each channel once + md1.ch <- struct{}{} + md2.ch <- struct{}{} + + // then wait until all goroutines are finished, and ensure p.Detect was only called once + wg.Wait() + md1.AssertNumberOfCalls(t, "Detect", 1) + md2.AssertNumberOfCalls(t, "Detect", 1) +} func TestFilterAttributes_Match(t *testing.T) { m := map[string]struct{}{ diff --git a/processor/resourcedetectionprocessor/resourcedetection_processor_test.go b/processor/resourcedetectionprocessor/resourcedetection_processor_test.go index c247c75cecaf..dde087175a4e 100644 --- a/processor/resourcedetectionprocessor/resourcedetection_processor_test.go +++ b/processor/resourcedetectionprocessor/resourcedetection_processor_test.go @@ -129,15 +129,6 @@ func TestResourceProcessor(t *testing.T) { detectedResource: nil, expectedResource: map[string]any{}, }, - // { - // name: "Detection error", - // sourceResource: map[string]any{ - // "type": "original-type", - // "original-label": "original-value", - // "cloud.availability_zone": "original-zone", - // }, - // detectedError: errors.New("err1"), - // }, { name: "Invalid detector key", detectorKeys: []string{"invalid-key"},