From 3e8cb2e15b892c3e6e4e8c09e98294673452424f Mon Sep 17 00:00:00 2001 From: odubajDT Date: Mon, 27 Jan 2025 14:07:37 +0100 Subject: [PATCH] [processor/resourcedetection] introduce retry mechanism for detectors Signed-off-by: odubajDT --- .../internal/resourcedetection.go | 85 +++++++++++++++++-- .../resourcedetection_processor.go | 2 +- 2 files changed, 78 insertions(+), 9 deletions(-) diff --git a/processor/resourcedetectionprocessor/internal/resourcedetection.go b/processor/resourcedetectionprocessor/internal/resourcedetection.go index 63d8d3ee22897..9e99a87a7fe3d 100644 --- a/processor/resourcedetectionprocessor/internal/resourcedetection.go +++ b/processor/resourcedetectionprocessor/internal/resourcedetection.go @@ -111,13 +111,32 @@ func (p *ResourceProvider) Get(ctx context.Context, client *http.Client) (resour var cancel context.CancelFunc ctx, cancel = context.WithTimeout(ctx, client.Timeout) defer cancel() - p.detectResource(ctx) + p.detectResource(ctx, client) }) return p.detectedResource.resource, p.detectedResource.schemaURL, p.detectedResource.err } -func (p *ResourceProvider) detectResource(ctx context.Context) { +// func (p *ResourceProvider) Get(ctx context.Context, client *http.Client) (resource pcommon.Resource, schemaURL string, err error) { +// var cancel context.CancelFunc +// for { +// ctx, cancel = context.WithTimeout(ctx, client.Timeout) +// defer cancel() + +// err = p.detectResource(ctx) +// if err == nil { +// break +// } + +// // Handle the error (e.g., log it, wait before retrying, etc.) +// p.logger.Warn("Failed to detect resource: Retrying...", zap.Error(err)) +// time.Sleep(2 * time.Second) // Wait before retrying +// } + +// return p.detectedResource.resource, p.detectedResource.schemaURL, p.detectedResource.err +// } + +func (p *ResourceProvider) detectResource(ctx context.Context, client *http.Client) { p.detectedResource = &resourceResult{} res := pcommon.NewResource() @@ -125,13 +144,34 @@ func (p *ResourceProvider) detectResource(ctx context.Context) { p.logger.Info("began detecting resource information") + type result struct { + r pcommon.Resource + schemaURL string + err error + } + + resultsChan := make(chan result, len(p.detectors)) for _, detector := range p.detectors { - r, schemaURL, err := detector.Detect(ctx) - if err != nil { - p.logger.Warn("failed to detect resource", zap.Error(err)) - } else { - mergedSchemaURL = MergeSchemaURL(mergedSchemaURL, schemaURL) - MergeResource(res, r, false) + go func(detector Detector) { + for { + //ctx, _ = context.WithTimeout(ctx, client.Timeout) + r, schemaURL, err := detector.Detect(context.TODO()) + if err != nil { + p.logger.Warn("failed to detect resource", zap.Error(err)) + time.Sleep(2 * time.Second) // Wait before retrying + } else { + resultsChan <- result{r: r, schemaURL: schemaURL, err: nil} + return + } + } + }(detector) + } + + for range p.detectors { + result := <-resultsChan + if result.err == nil { + mergedSchemaURL = MergeSchemaURL(mergedSchemaURL, result.schemaURL) + MergeResource(res, result.r, false) } } @@ -146,6 +186,35 @@ func (p *ResourceProvider) detectResource(ctx context.Context) { p.detectedResource.schemaURL = mergedSchemaURL } +// func (p *ResourceProvider) detectResource(ctx context.Context) { +// p.detectedResource = &resourceResult{} + +// res := pcommon.NewResource() +// mergedSchemaURL := "" + +// p.logger.Info("began detecting resource information") + +// for _, detector := range p.detectors { +// r, schemaURL, err := detector.Detect(ctx) +// if err != nil { +// p.logger.Warn("failed to detect resource", zap.Error(err)) +// } else { +// mergedSchemaURL = MergeSchemaURL(mergedSchemaURL, schemaURL) +// MergeResource(res, r, false) +// } +// } + +// droppedAttributes := filterAttributes(res.Attributes(), p.attributesToKeep) + +// p.logger.Info("detected resource information", zap.Any("resource", res.Attributes().AsRaw())) +// if len(droppedAttributes) > 0 { +// p.logger.Info("dropped resource information", zap.Strings("resource keys", droppedAttributes)) +// } + +// p.detectedResource.resource = res +// p.detectedResource.schemaURL = mergedSchemaURL +// } + func MergeSchemaURL(currentSchemaURL string, newSchemaURL string) string { if currentSchemaURL == "" { return newSchemaURL diff --git a/processor/resourcedetectionprocessor/resourcedetection_processor.go b/processor/resourcedetectionprocessor/resourcedetection_processor.go index 40d2939ad3541..df9ba391cdbf5 100644 --- a/processor/resourcedetectionprocessor/resourcedetection_processor.go +++ b/processor/resourcedetectionprocessor/resourcedetection_processor.go @@ -31,7 +31,7 @@ func (rdp *resourceDetectionProcessor) Start(ctx context.Context, host component client, _ := rdp.httpClientSettings.ToClient(ctx, host, rdp.telemetrySettings) ctx = internal.ContextWithClient(ctx, client) var err error - rdp.resource, rdp.schemaURL, err = rdp.provider.Get(ctx, client) + rdp.resource, rdp.schemaURL, err = rdp.provider.Get(ctx, client) // here return err }