Skip to content

Commit

Permalink
Update host metrics receiver to use receiverhelper
Browse files Browse the repository at this point in the history
  • Loading branch information
james-bebbington committed Oct 15, 2020
1 parent 77f7248 commit a90bbee
Show file tree
Hide file tree
Showing 55 changed files with 614 additions and 988 deletions.
3 changes: 2 additions & 1 deletion component/componenterror/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,9 @@ func CombineErrors(errs []error) error {
}
if partialError, isPartial := err.(consumererror.PartialScrapeError); isPartial {
partialScrapeErr = true
failedScrapeCount += partialError.GetFailedCount()
failedScrapeCount += partialError.Failed
}

errMsgs = append(errMsgs, err.Error())
}

Expand Down
2 changes: 1 addition & 1 deletion component/componenterror/errors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func TestCombineErrors(t *testing.T) {
assert.Equal(t, tc.expectedPartialScrapeErr, isPartial)

if tc.expectedPartialScrapeErr && isPartial {
assert.Equal(t, tc.expectedFailedScrapeCount, partialErr.GetFailedCount())
assert.Equal(t, tc.expectedFailedScrapeCount, partialErr.Failed)
}
}
}
9 changes: 2 additions & 7 deletions consumer/consumererror/partialscrapeerror.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,18 @@ package consumererror
// to be scraped
type PartialScrapeError struct {
error
failed int
Failed int
}

// NewPartialScrapeError creates PartialScrapeError for failed metrics.
// Use this error type only when a subset of data was failed to be scraped.
func NewPartialScrapeError(err error, failed int) error {
return PartialScrapeError{
error: err,
failed: failed,
Failed: failed,
}
}

// GetFailedCount returns the number of metrics that were failed to be scraped.
func (err PartialScrapeError) GetFailedCount() int {
return err.failed
}

// IsPartialScrapeError checks if an error was wrapped with PartialScrapeError.
func IsPartialScrapeError(err error) bool {
if err == nil {
Expand Down
2 changes: 1 addition & 1 deletion consumer/consumererror/partialscrapeerror_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func TestPartialScrapeError(t *testing.T) {
err := fmt.Errorf("some error")
partialErr := NewPartialScrapeError(err, failed)
assert.Equal(t, err.Error(), partialErr.Error())
assert.Equal(t, failed, partialErr.(PartialScrapeError).failed)
assert.Equal(t, failed, partialErr.(PartialScrapeError).Failed)
}

func TestIsPartialScrapeError(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion obsreport/obsreport_scraper.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func EndMetricsScrapeOp(
numErroredMetrics := 0
if err != nil {
if partialErr, isPartial := err.(consumererror.PartialScrapeError); isPartial {
numErroredMetrics = partialErr.GetFailedCount()
numErroredMetrics = partialErr.Failed
} else {
numErroredMetrics = numScrapedMetrics
numScrapedMetrics = 0
Expand Down
10 changes: 3 additions & 7 deletions receiver/hostmetricsreceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,12 @@
package hostmetricsreceiver

import (
"time"

"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/receiver/hostmetricsreceiver/internal"
"go.opentelemetry.io/collector/receiver/receiverhelper"
)

// Config defines configuration for HostMetrics receiver.
type Config struct {
configmodels.ReceiverSettings `mapstructure:",squash"`

CollectionInterval time.Duration `mapstructure:"collection_interval"`
Scrapers map[string]internal.Config `mapstructure:"-"`
receiverhelper.ScraperControllerSettings `mapstructure:",squash"`
Scrapers map[string]internal.Config `mapstructure:"-"`
}
11 changes: 7 additions & 4 deletions receiver/hostmetricsreceiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"go.opentelemetry.io/collector/receiver/hostmetricsreceiver/internal/scraper/processesscraper"
"go.opentelemetry.io/collector/receiver/hostmetricsreceiver/internal/scraper/processscraper"
"go.opentelemetry.io/collector/receiver/hostmetricsreceiver/internal/scraper/swapscraper"
"go.opentelemetry.io/collector/receiver/receiverhelper"
)

func TestLoadConfig(t *testing.T) {
Expand All @@ -61,11 +62,13 @@ func TestLoadConfig(t *testing.T) {

r1 := cfg.Receivers["hostmetrics/customname"].(*Config)
expectedConfig := &Config{
ReceiverSettings: configmodels.ReceiverSettings{
TypeVal: typeStr,
NameVal: "hostmetrics/customname",
ScraperControllerSettings: receiverhelper.ScraperControllerSettings{
ReceiverSettings: configmodels.ReceiverSettings{
TypeVal: typeStr,
NameVal: "hostmetrics/customname",
},
CollectionInterval: 30 * time.Second,
},
CollectionInterval: 30 * time.Second,
Scrapers: map[string]internal.Config{
cpuscraper.TypeStr: &cpuscraper.Config{},
diskscraper.TypeStr: &diskscraper.Config{},
Expand Down
82 changes: 68 additions & 14 deletions receiver/hostmetricsreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ import (
"context"
"errors"
"fmt"
"time"

"github.com/spf13/viper"
"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config"
Expand Down Expand Up @@ -88,10 +88,6 @@ func customUnmarshaler(componentViperSection *viper.Viper, intoCfg interface{})
return fmt.Errorf("config type not hostmetrics.Config")
}

if cfg.CollectionInterval <= 0 {
return fmt.Errorf("collection_interval must be a positive duration")
}

// dynamically load the individual collector configs based on the key name

cfg.Scrapers = map[string]internal.Config{}
Expand Down Expand Up @@ -140,13 +136,7 @@ func getScraperFactory(key string) (internal.BaseFactory, bool) {

// createDefaultConfig creates the default configuration for receiver.
func createDefaultConfig() configmodels.Receiver {
return &Config{
ReceiverSettings: configmodels.ReceiverSettings{
TypeVal: typeStr,
NameVal: typeStr,
},
CollectionInterval: time.Minute,
}
return &Config{ScraperControllerSettings: receiverhelper.DefaultScraperControllerSettings(typeStr)}
}

// createMetricsReceiver creates a metrics receiver based on provided config.
Expand All @@ -158,10 +148,74 @@ func createMetricsReceiver(
) (component.MetricsReceiver, error) {
oCfg := cfg.(*Config)

hmr, err := newHostMetricsReceiver(ctx, params.Logger, oCfg, scraperFactories, resourceScraperFactories, consumer)
addScraperOptions, err := createAddScraperOptions(ctx, params.Logger, oCfg, scraperFactories, resourceScraperFactories)
if err != nil {
return nil, err
}

return hmr, nil
return receiverhelper.NewScraperControllerReceiver(
&oCfg.ScraperControllerSettings,
consumer,
addScraperOptions...,
)
}

func createAddScraperOptions(
ctx context.Context,
logger *zap.Logger,
config *Config,
factories map[string]internal.ScraperFactory,
resourceFactories map[string]internal.ResourceScraperFactory,
) ([]receiverhelper.ScraperControllerOption, error) {
scraperControllerOptions := make([]receiverhelper.ScraperControllerOption, 0, len(config.Scrapers))

for key, cfg := range config.Scrapers {
hostMetricsScraper, ok, err := createHostMetricsScraper(ctx, logger, key, cfg, factories)
if err != nil {
return nil, fmt.Errorf("failed to create scraper for key %q: %w", key, err)
}

if ok {
scraperControllerOptions = append(scraperControllerOptions, receiverhelper.AddMetricsScraper(hostMetricsScraper))
continue
}

resourceMetricsScraper, ok, err := createResourceMetricsScraper(ctx, logger, key, cfg, resourceFactories)
if err != nil {
return nil, fmt.Errorf("failed to create resource scraper for key %q: %w", key, err)
}

if ok {
scraperControllerOptions = append(scraperControllerOptions, receiverhelper.AddResourceMetricsScraper(resourceMetricsScraper))
continue
}

return nil, fmt.Errorf("host metrics scraper factory not found for key: %q", key)
}

return scraperControllerOptions, nil
}

func createHostMetricsScraper(ctx context.Context, logger *zap.Logger, key string, cfg internal.Config, factories map[string]internal.ScraperFactory) (scraper receiverhelper.MetricsScraper, ok bool, err error) {
factory := factories[key]
if factory == nil {
ok = false
return
}

ok = true
scraper, err = factory.CreateMetricsScraper(ctx, logger, cfg)
return
}

func createResourceMetricsScraper(ctx context.Context, logger *zap.Logger, key string, cfg internal.Config, factories map[string]internal.ResourceScraperFactory) (scraper receiverhelper.ResourceMetricsScraper, ok bool, err error) {
factory := factories[key]
if factory == nil {
ok = false
return
}

ok = true
scraper, err = factory.CreateMetricsScraper(ctx, logger, cfg)
return
}
7 changes: 4 additions & 3 deletions receiver/hostmetricsreceiver/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configcheck"
"go.opentelemetry.io/collector/config/configerror"
"go.opentelemetry.io/collector/exporter/exportertest"
"go.opentelemetry.io/collector/receiver/hostmetricsreceiver/internal"
)

Expand All @@ -41,12 +42,12 @@ func TestCreateReceiver(t *testing.T) {
factory := NewFactory()
cfg := factory.CreateDefaultConfig()

tReceiver, err := factory.CreateTraceReceiver(context.Background(), creationParams, cfg, nil)
tReceiver, err := factory.CreateTraceReceiver(context.Background(), creationParams, cfg, &exportertest.SinkTraceExporter{})

assert.Equal(t, err, configerror.ErrDataTypeIsNotSupported)
assert.Nil(t, tReceiver)

mReceiver, err := factory.CreateMetricsReceiver(context.Background(), creationParams, cfg, nil)
mReceiver, err := factory.CreateMetricsReceiver(context.Background(), creationParams, cfg, &exportertest.SinkMetricsExporter{})

assert.NoError(t, err)
assert.NotNil(t, mReceiver)
Expand All @@ -58,6 +59,6 @@ func TestCreateReceiver_ScraperKeyConfigError(t *testing.T) {
factory := NewFactory()
cfg := &Config{Scrapers: map[string]internal.Config{errorKey: &mockConfig{}}}

_, err := factory.CreateMetricsReceiver(context.Background(), creationParams, cfg, nil)
_, err := factory.CreateMetricsReceiver(context.Background(), creationParams, cfg, &exportertest.SinkMetricsExporter{})
assert.EqualError(t, err, fmt.Sprintf("host metrics scraper factory not found for key: %q", errorKey))
}
Loading

0 comments on commit a90bbee

Please sign in to comment.