Skip to content

Commit

Permalink
blocking/non-blocking version
Browse files Browse the repository at this point in the history
Signed-off-by: odubajDT <ondrej.dubaj@dynatrace.com>
  • Loading branch information
odubajDT committed Jan 28, 2025
1 parent d984179 commit 9c0d598
Show file tree
Hide file tree
Showing 6 changed files with 168 additions and 165 deletions.
13 changes: 12 additions & 1 deletion processor/resourcedetectionprocessor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -615,7 +615,7 @@ resourcedetection:

## Ordering

Note that if multiple detectors are inserting the same attribute name, the first detector to insert wins. For example if you had `detectors: [eks, ec2]` then `cloud.platform` will be `aws_eks` instead of `ec2`. The below ordering is recommended.
By default, if multiple detectors are inserting the same attribute name, the first detector to insert wins. For example if you had `detectors: [eks, ec2]` then `cloud.platform` will be `aws_eks` instead of `ec2`. The below ordering is recommended.

### AWS

Expand All @@ -627,3 +627,14 @@ Note that if multiple detectors are inserting the same attribute name, the first

The full list of settings exposed for this extension are documented in [config.go](./config.go)
with detailed sample configurations in [testdata/config.yaml](./testdata/config.yaml).

**Note:**

If you want to disable the ordering of the detectors and instead have a non-blocking resource detection in case of a detection failure, set the `keepOrder` parameter to `false`. For example:

```yaml
processors:
resourcedetection:
detectors: [docker]
keepOrder: false
```
3 changes: 3 additions & 0 deletions processor/resourcedetectionprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ type Config struct {
// If a supplied attribute is not a valid attribute of a supplied detector it will be ignored.
// Deprecated: Please use detector's resource_attributes config instead
Attributes []string `mapstructure:"attributes"`
// KeepOrder is a parameter which perserves the order of detectors and the detection of data
// This also introduces a blocking behavior, if one of the detectors cannot detect the resource
KeepOrder bool `mapstructure:"keepOrder"`
}

// DetectorConfig contains user-specified configurations unique to all individual detectors
Expand Down
6 changes: 4 additions & 2 deletions processor/resourcedetectionprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ func createDefaultConfig() component.Config {
Override: true,
Attributes: nil,
DetectorConfig: detectorCreateDefaultConfig(),
KeepOrder: true,
// TODO: Once issue(https://github.com/open-telemetry/opentelemetry-collector/issues/4001) gets resolved,
// Set the default value of 'hostname_source' here instead of 'system' detector
}
Expand Down Expand Up @@ -199,7 +200,7 @@ func (f *factory) getResourceDetectionProcessor(
if oCfg.Attributes != nil {
params.Logger.Warn("You are using deprecated `attributes` option that will be removed soon; use `resource_attributes` instead, details on configuration: https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/processor/resourcedetectionprocessor#migration-from-attributes-to-resource_attributes")
}
provider, err := f.getResourceProvider(params, oCfg.ClientConfig.Timeout, oCfg.Detectors, oCfg.DetectorConfig, oCfg.Attributes)
provider, err := f.getResourceProvider(params, oCfg.ClientConfig.Timeout, oCfg.Detectors, oCfg.DetectorConfig, oCfg.Attributes, oCfg.KeepOrder)
if err != nil {
return nil, err
}
Expand All @@ -218,6 +219,7 @@ func (f *factory) getResourceProvider(
configuredDetectors []string,
detectorConfigs DetectorConfig,
attributes []string,
keepOrder bool,
) (*internal.ResourceProvider, error) {
f.lock.Lock()
defer f.lock.Unlock()
Expand All @@ -231,7 +233,7 @@ func (f *factory) getResourceProvider(
detectorTypes = append(detectorTypes, internal.DetectorType(strings.TrimSpace(key)))
}

provider, err := f.resourceProviderFactory.CreateResourceProvider(params, timeout, attributes, &detectorConfigs, detectorTypes...)
provider, err := f.resourceProviderFactory.CreateResourceProvider(params, timeout, attributes, &detectorConfigs, keepOrder, detectorTypes...)
if err != nil {
return nil, err
}
Expand Down
44 changes: 28 additions & 16 deletions processor/resourcedetectionprocessor/internal/resourcedetection.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ func (f *ResourceProviderFactory) CreateResourceProvider(
timeout time.Duration,
attributes []string,
detectorConfigs ResourceDetectorConfig,
keepOrder bool,
detectorTypes ...DetectorType,
) (*ResourceProvider, error) {
detectors, err := f.getDetectors(params, detectorConfigs, detectorTypes)
Expand All @@ -59,7 +60,7 @@ func (f *ResourceProviderFactory) CreateResourceProvider(
}
}

provider := NewResourceProvider(params.Logger, timeout, attributesToKeep, detectors...)
provider := NewResourceProvider(params.Logger, timeout, attributesToKeep, keepOrder, detectors...)
return provider, nil
}

Expand Down Expand Up @@ -89,6 +90,7 @@ type ResourceProvider struct {
detectedResource *resourceResult
once sync.Once
attributesToKeep map[string]struct{}
keepOrder bool
}

type resourceResult struct {
Expand All @@ -97,12 +99,13 @@ type resourceResult struct {
err error
}

func NewResourceProvider(logger *zap.Logger, timeout time.Duration, attributesToKeep map[string]struct{}, detectors ...Detector) *ResourceProvider {
func NewResourceProvider(logger *zap.Logger, timeout time.Duration, attributesToKeep map[string]struct{}, keepOrder bool, detectors ...Detector) *ResourceProvider {
return &ResourceProvider{
logger: logger,
timeout: timeout,
detectors: detectors,
attributesToKeep: attributesToKeep,
keepOrder: keepOrder,
}
}

Expand All @@ -114,6 +117,21 @@ func (p *ResourceProvider) Get(ctx context.Context, _ *http.Client) (resource pc
return p.detectedResource.resource, p.detectedResource.schemaURL, p.detectedResource.err
}

type detectResult struct {
r pcommon.Resource
schemaURL string
err error
}

func handleResult(res *pcommon.Resource, resultsChan chan detectResult, mergedSchemaURL string) string {
result := <-resultsChan
if result.err == nil {
mergedSchemaURL = MergeSchemaURL(mergedSchemaURL, result.schemaURL)
MergeResource(*res, result.r, false)
}
return mergedSchemaURL
}

func (p *ResourceProvider) detectResource(ctx context.Context) {
p.detectedResource = &resourceResult{}

Expand All @@ -122,13 +140,7 @@ func (p *ResourceProvider) detectResource(ctx context.Context) {

p.logger.Info("began detecting resource information")

type result struct {
r pcommon.Resource
schemaURL string
err error
}

resultsChan := make(chan result, len(p.detectors))
resultsChan := make(chan detectResult, len(p.detectors))
for _, detector := range p.detectors {
go func(detector Detector) {
sleep := 2 * time.Second
Expand All @@ -139,18 +151,18 @@ func (p *ResourceProvider) detectResource(ctx context.Context) {
time.Sleep(sleep)
sleep *= 2
} else {
resultsChan <- result{r: r, schemaURL: schemaURL, err: nil}
resultsChan <- detectResult{r: r, schemaURL: schemaURL, err: nil}
return
}
}
}(detector)
if p.keepOrder {
mergedSchemaURL = handleResult(&res, resultsChan, mergedSchemaURL)
}
}

for range p.detectors {
result := <-resultsChan
if result.err == nil {
mergedSchemaURL = MergeSchemaURL(mergedSchemaURL, result.schemaURL)
MergeResource(res, result.r, false)
if !p.keepOrder {
for range p.detectors {
mergedSchemaURL = handleResult(&res, resultsChan, mergedSchemaURL)
}
}

Expand Down
Loading

0 comments on commit 9c0d598

Please sign in to comment.