Skip to content

Commit

Permalink
[processor/resourcedetection] introduce retry mechanism for detectors
Browse files Browse the repository at this point in the history
Signed-off-by: odubajDT <ondrej.dubaj@dynatrace.com>
  • Loading branch information
odubajDT committed Jan 27, 2025
1 parent 7c32a5d commit 3e8cb2e
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 9 deletions.
85 changes: 77 additions & 8 deletions processor/resourcedetectionprocessor/internal/resourcedetection.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,27 +111,67 @@ func (p *ResourceProvider) Get(ctx context.Context, client *http.Client) (resour
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, client.Timeout)
defer cancel()
p.detectResource(ctx)
p.detectResource(ctx, client)
})

return p.detectedResource.resource, p.detectedResource.schemaURL, p.detectedResource.err
}

func (p *ResourceProvider) detectResource(ctx context.Context) {
// func (p *ResourceProvider) Get(ctx context.Context, client *http.Client) (resource pcommon.Resource, schemaURL string, err error) {
// var cancel context.CancelFunc
// for {
// ctx, cancel = context.WithTimeout(ctx, client.Timeout)
// defer cancel()

// err = p.detectResource(ctx)
// if err == nil {
// break
// }

// // Handle the error (e.g., log it, wait before retrying, etc.)
// p.logger.Warn("Failed to detect resource: Retrying...", zap.Error(err))
// time.Sleep(2 * time.Second) // Wait before retrying
// }

// return p.detectedResource.resource, p.detectedResource.schemaURL, p.detectedResource.err
// }

func (p *ResourceProvider) detectResource(ctx context.Context, client *http.Client) {
p.detectedResource = &resourceResult{}

res := pcommon.NewResource()
mergedSchemaURL := ""

p.logger.Info("began detecting resource information")

type result struct {
r pcommon.Resource
schemaURL string
err error
}

resultsChan := make(chan result, len(p.detectors))
for _, detector := range p.detectors {
r, schemaURL, err := detector.Detect(ctx)
if err != nil {
p.logger.Warn("failed to detect resource", zap.Error(err))
} else {
mergedSchemaURL = MergeSchemaURL(mergedSchemaURL, schemaURL)
MergeResource(res, r, false)
go func(detector Detector) {
for {
//ctx, _ = context.WithTimeout(ctx, client.Timeout)
r, schemaURL, err := detector.Detect(context.TODO())
if err != nil {
p.logger.Warn("failed to detect resource", zap.Error(err))
time.Sleep(2 * time.Second) // Wait before retrying
} else {
resultsChan <- result{r: r, schemaURL: schemaURL, err: nil}
return
}
}
}(detector)
}

for range p.detectors {
result := <-resultsChan
if result.err == nil {
mergedSchemaURL = MergeSchemaURL(mergedSchemaURL, result.schemaURL)
MergeResource(res, result.r, false)
}
}

Expand All @@ -146,6 +186,35 @@ func (p *ResourceProvider) detectResource(ctx context.Context) {
p.detectedResource.schemaURL = mergedSchemaURL
}

// func (p *ResourceProvider) detectResource(ctx context.Context) {
// p.detectedResource = &resourceResult{}

// res := pcommon.NewResource()
// mergedSchemaURL := ""

// p.logger.Info("began detecting resource information")

// for _, detector := range p.detectors {
// r, schemaURL, err := detector.Detect(ctx)
// if err != nil {
// p.logger.Warn("failed to detect resource", zap.Error(err))
// } else {
// mergedSchemaURL = MergeSchemaURL(mergedSchemaURL, schemaURL)
// MergeResource(res, r, false)
// }
// }

// droppedAttributes := filterAttributes(res.Attributes(), p.attributesToKeep)

// p.logger.Info("detected resource information", zap.Any("resource", res.Attributes().AsRaw()))
// if len(droppedAttributes) > 0 {
// p.logger.Info("dropped resource information", zap.Strings("resource keys", droppedAttributes))
// }

// p.detectedResource.resource = res
// p.detectedResource.schemaURL = mergedSchemaURL
// }

func MergeSchemaURL(currentSchemaURL string, newSchemaURL string) string {
if currentSchemaURL == "" {
return newSchemaURL
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func (rdp *resourceDetectionProcessor) Start(ctx context.Context, host component
client, _ := rdp.httpClientSettings.ToClient(ctx, host, rdp.telemetrySettings)
ctx = internal.ContextWithClient(ctx, client)
var err error
rdp.resource, rdp.schemaURL, err = rdp.provider.Get(ctx, client)
rdp.resource, rdp.schemaURL, err = rdp.provider.Get(ctx, client) // here
return err
}

Expand Down

0 comments on commit 3e8cb2e

Please sign in to comment.