Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[receiver/kubeletstats] Migrate kubeletstatsreceiver to the new Metrics Builder #9744

Merged
merged 20 commits into from
May 13, 2022
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
- `transformprocessor`: Add new `limit` function to allow limiting the number of items in a map, such as the number of attributes in `attributes` or `resource.attributes` (#9552)
- `processor/attributes`: Support attributes set by server authenticator (#9420)
- `datadogexporter`: Experimental support for Exponential Histograms with delta aggregation temporality (#8350)
- `kubeletstatsreceiver` Update receiver to use new Metrics Builder. All emitted metrics remain the same. (#9744)

### 🧰 Bug fixes 🧰

Expand Down
4 changes: 4 additions & 0 deletions receiver/kubeletstatsreceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig"
kube "github.com/open-telemetry/opentelemetry-collector-contrib/internal/kubelet"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kubeletstatsreceiver/internal/kubelet"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kubeletstatsreceiver/internal/metadata"
)

var _ config.Receiver = (*Config)(nil)
Expand All @@ -48,6 +49,9 @@ type Config struct {

// Configuration of the Kubernetes API client.
K8sAPIConfig *k8sconfig.APIConfig `mapstructure:"k8s_api_config"`

// Metrics allows customizing scraped metrics representation.
Metrics metadata.MetricsSettings `mapstructure:"metrics"`
}

func (cfg *Config) Validate() error {
Expand Down
7 changes: 7 additions & 0 deletions receiver/kubeletstatsreceiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig"
kube "github.com/open-telemetry/opentelemetry-collector-contrib/internal/kubelet"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kubeletstatsreceiver/internal/kubelet"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kubeletstatsreceiver/internal/metadata"
)

func TestLoadConfig(t *testing.T) {
Expand Down Expand Up @@ -59,6 +60,7 @@ func TestLoadConfig(t *testing.T) {
kubelet.PodMetricGroup,
kubelet.NodeMetricGroup,
},
Metrics: metadata.DefaultMetricsSettings(),
}, defaultCfg)

tlsCfg := cfg.Receivers[config.NewComponentIDWithName(typeStr, "tls")].(*Config)
Expand Down Expand Up @@ -86,6 +88,7 @@ func TestLoadConfig(t *testing.T) {
kubelet.PodMetricGroup,
kubelet.NodeMetricGroup,
},
Metrics: metadata.DefaultMetricsSettings(),
}, tlsCfg)

saCfg := cfg.Receivers[config.NewComponentIDWithName(typeStr, "sa")].(*Config)
Expand All @@ -105,6 +108,7 @@ func TestLoadConfig(t *testing.T) {
kubelet.PodMetricGroup,
kubelet.NodeMetricGroup,
},
Metrics: metadata.DefaultMetricsSettings(),
}, saCfg)

metadataCfg := cfg.Receivers[config.NewComponentIDWithName(typeStr, "metadata")].(*Config)
Expand All @@ -127,6 +131,7 @@ func TestLoadConfig(t *testing.T) {
kubelet.PodMetricGroup,
kubelet.NodeMetricGroup,
},
Metrics: metadata.DefaultMetricsSettings(),
}, metadataCfg)

metricGroupsCfg := cfg.Receivers[config.NewComponentIDWithName(typeStr, "metric_groups")].(*Config)
Expand All @@ -145,6 +150,7 @@ func TestLoadConfig(t *testing.T) {
kubelet.NodeMetricGroup,
kubelet.VolumeMetricGroup,
},
Metrics: metadata.DefaultMetricsSettings(),
}, metricGroupsCfg)

metadataWithK8sAPICfg := cfg.Receivers[config.NewComponentIDWithName(typeStr, "metadata_with_k8s_api")].(*Config)
Expand All @@ -167,6 +173,7 @@ func TestLoadConfig(t *testing.T) {
kubelet.NodeMetricGroup,
},
K8sAPIConfig: &k8sconfig.APIConfig{AuthType: k8sconfig.AuthTypeKubeConfig},
Metrics: metadata.DefaultMetricsSettings(),
}, metadataWithK8sAPICfg)
}

Expand Down
2 changes: 1 addition & 1 deletion receiver/kubeletstatsreceiver/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,6 @@
//go:build !windows
// +build !windows

//go:generate mdatagen metadata.yaml
//go:generate mdatagen --experimental-gen metadata.yaml

package kubeletstatsreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kubeletstatsreceiver"
9 changes: 8 additions & 1 deletion receiver/kubeletstatsreceiver/documentation.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,14 @@ These are the metrics available for this scraper.
| **k8s.volume.inodes.free** | The free inodes in the filesystem. | 1 | Gauge(Int) | <ul> </ul> |
| **k8s.volume.inodes.used** | The inodes used by the filesystem. This may not equal inodes - free because filesystem may share inodes with other filesystems. | 1 | Gauge(Int) | <ul> </ul> |

**Highlighted metrics** are emitted by default.
**Highlighted metrics** are emitted by default. Other metrics are optional and not emitted by default.
Any metric can be enabled or disabled with the following scraper configuration:

```yaml
metrics:
<metric_name>:
enabled: <true|false>
```

## Resource attributes

Expand Down
4 changes: 3 additions & 1 deletion receiver/kubeletstatsreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig"
kube "github.com/open-telemetry/opentelemetry-collector-contrib/internal/kubelet"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kubeletstatsreceiver/internal/kubelet"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kubeletstatsreceiver/internal/metadata"
)

const (
Expand Down Expand Up @@ -59,6 +60,7 @@ func createDefaultConfig() config.Receiver {
AuthType: k8sconfig.AuthTypeTLS,
},
},
Metrics: metadata.DefaultMetricsSettings(),
}
}

Expand All @@ -78,7 +80,7 @@ func createMetricsReceiver(
return nil, err
}

scrp, err := newKubletScraper(rest, set, rOptions)
scrp, err := newKubletScraper(rest, set, rOptions, cfg.Metrics)
if err != nil {
return nil, err
}
Expand Down
82 changes: 30 additions & 52 deletions receiver/kubeletstatsreceiver/internal/kubelet/accumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,66 +49,49 @@ type metricDataAccumulator struct {
logger *zap.Logger
metricGroupsToCollect map[MetricGroup]bool
time time.Time
mbs *metadata.MetricsBuilders
}

const (
scopeName = "otelcol/kubeletstatsreceiver"
)

func (a *metricDataAccumulator) nodeStats(s stats.NodeStats) {
if !a.metricGroupsToCollect[NodeMetricGroup] {
return
}

md := pmetric.NewMetrics()
rm := md.ResourceMetrics().AppendEmpty()
fillNodeResource(rm.Resource(), s)

ilm := rm.ScopeMetrics().AppendEmpty()
ilm.Scope().SetName(scopeName)

startTime := pcommon.NewTimestampFromTime(s.StartTime.Time)
dmitryax marked this conversation as resolved.
Show resolved Hide resolved
a.mbs.WithNodeStartTime.Reset(metadata.WithStartTime(pcommon.NewTimestampFromTime(s.StartTime.Time)))
dmitryax marked this conversation as resolved.
Show resolved Hide resolved
currentTime := pcommon.NewTimestampFromTime(a.time)
addCPUMetrics(ilm.Metrics(), metadata.NodeCPUMetrics, s.CPU, startTime, currentTime)
addMemoryMetrics(ilm.Metrics(), metadata.NodeMemoryMetrics, s.Memory, currentTime)
addFilesystemMetrics(ilm.Metrics(), metadata.NodeFilesystemMetrics, s.Fs, currentTime)
addNetworkMetrics(ilm.Metrics(), metadata.NodeNetworkMetrics, s.Network, startTime, currentTime)

addCPUMetrics(a.mbs.WithNodeStartTime, metadata.NodeCPUMetrics, s.CPU, currentTime)
addMemoryMetrics(a.mbs.WithNodeStartTime, metadata.NodeMemoryMetrics, s.Memory, currentTime)
addFilesystemMetrics(a.mbs.WithNodeStartTime, metadata.NodeFilesystemMetrics, s.Fs, currentTime)
addNetworkMetrics(a.mbs.WithNodeStartTime, metadata.NodeNetworkMetrics, s.Network, currentTime)
// todo s.Runtime.ImageFs

a.m = append(a.m, md)
a.m = append(a.m, a.mbs.WithNodeStartTime.Emit(metadata.WithK8sNodeName(s.NodeName)))
}

func (a *metricDataAccumulator) podStats(s stats.PodStats) {
if !a.metricGroupsToCollect[PodMetricGroup] {
return
}

md := pmetric.NewMetrics()
rm := md.ResourceMetrics().AppendEmpty()
fillPodResource(rm.Resource(), s)

ilm := rm.ScopeMetrics().AppendEmpty()
ilm.Scope().SetName(scopeName)

startTime := pcommon.NewTimestampFromTime(s.StartTime.Time)
a.mbs.WithPodStartTime.Reset(metadata.WithStartTime(pcommon.NewTimestampFromTime(s.StartTime.Time)))
currentTime := pcommon.NewTimestampFromTime(a.time)
addCPUMetrics(ilm.Metrics(), metadata.PodCPUMetrics, s.CPU, startTime, currentTime)
addMemoryMetrics(ilm.Metrics(), metadata.PodMemoryMetrics, s.Memory, currentTime)
addFilesystemMetrics(ilm.Metrics(), metadata.PodFilesystemMetrics, s.EphemeralStorage, currentTime)
addNetworkMetrics(ilm.Metrics(), metadata.PodNetworkMetrics, s.Network, startTime, currentTime)

a.m = append(a.m, md)
addCPUMetrics(a.mbs.WithPodStartTime, metadata.PodCPUMetrics, s.CPU, currentTime)
addMemoryMetrics(a.mbs.WithPodStartTime, metadata.PodMemoryMetrics, s.Memory, currentTime)
addFilesystemMetrics(a.mbs.WithPodStartTime, metadata.PodFilesystemMetrics, s.EphemeralStorage, currentTime)
addNetworkMetrics(a.mbs.WithPodStartTime, metadata.PodNetworkMetrics, s.Network, currentTime)

a.m = append(a.m, a.mbs.WithPodStartTime.Emit(metadata.WithK8sPodUID(s.PodRef.UID),
metadata.WithK8sPodName(s.PodRef.Name), metadata.WithK8sNamespaceName(s.PodRef.Namespace)))
}

func (a *metricDataAccumulator) containerStats(sPod stats.PodStats, s stats.ContainerStats) {
if !a.metricGroupsToCollect[ContainerMetricGroup] {
return
}

md := pmetric.NewMetrics()
rm := md.ResourceMetrics().AppendEmpty()

if err := fillContainerResource(rm.Resource(), sPod, s, a.metadata); err != nil {
ro, err := getContainerResourceOptions(sPod, s, a.metadata)
if err != nil {
a.logger.Warn(
"failed to fetch container metrics",
zap.String("pod", sPod.PodRef.Name),
Expand All @@ -117,26 +100,23 @@ func (a *metricDataAccumulator) containerStats(sPod stats.PodStats, s stats.Cont
return
}

ilm := rm.ScopeMetrics().AppendEmpty()
ilm.Scope().SetName(scopeName)

startTime := pcommon.NewTimestampFromTime(s.StartTime.Time)
a.mbs.WithPodStartTime.Reset(metadata.WithStartTime(pcommon.NewTimestampFromTime(s.StartTime.Time)))
currentTime := pcommon.NewTimestampFromTime(a.time)
addCPUMetrics(ilm.Metrics(), metadata.ContainerCPUMetrics, s.CPU, startTime, currentTime)
addMemoryMetrics(ilm.Metrics(), metadata.ContainerMemoryMetrics, s.Memory, currentTime)
addFilesystemMetrics(ilm.Metrics(), metadata.ContainerFilesystemMetrics, s.Rootfs, currentTime)
a.m = append(a.m, md)

addCPUMetrics(a.mbs.WithPodStartTime, metadata.ContainerCPUMetrics, s.CPU, currentTime)
addMemoryMetrics(a.mbs.WithPodStartTime, metadata.ContainerMemoryMetrics, s.Memory, currentTime)
addFilesystemMetrics(a.mbs.WithPodStartTime, metadata.ContainerFilesystemMetrics, s.Rootfs, currentTime)

a.m = append(a.m, a.mbs.WithPodStartTime.Emit(ro...))
}

func (a *metricDataAccumulator) volumeStats(sPod stats.PodStats, s stats.VolumeStats) {
if !a.metricGroupsToCollect[VolumeMetricGroup] {
return
}

md := pmetric.NewMetrics()
rm := md.ResourceMetrics().AppendEmpty()

if err := fillVolumeResource(rm.Resource(), sPod, s, a.metadata); err != nil {
ro, err := getVolumeResourceOptions(sPod, s, a.metadata)
if err != nil {
a.logger.Warn(
"Failed to gather additional volume metadata. Skipping metric collection.",
zap.String("pod", sPod.PodRef.Name),
Expand All @@ -145,10 +125,8 @@ func (a *metricDataAccumulator) volumeStats(sPod stats.PodStats, s stats.VolumeS
return
}

ilm := rm.ScopeMetrics().AppendEmpty()
ilm.Scope().SetName(scopeName)

currentTime := pcommon.NewTimestampFromTime(a.time)
addVolumeMetrics(ilm.Metrics(), metadata.K8sVolumeMetrics, s, currentTime)
a.m = append(a.m, md)
addVolumeMetrics(a.mbs.WithDefaultStartTime, metadata.K8sVolumeMetrics, s, currentTime)

a.m = append(a.m, a.mbs.WithDefaultStartTime.Emit(ro...))
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
stats "k8s.io/kubelet/pkg/apis/stats/v1alpha1"

"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kubeletstatsreceiver/internal/metadata"
)

// TestMetadataErrorCases walks through the error cases of collecting
Expand All @@ -39,7 +41,7 @@ func TestMetadataErrorCases(t *testing.T) {
numMDs int
numLogs int
logMessages []string
detailedPVCLabelsSetterOverride func(volCacheID, volumeClaim, namespace string, labels map[string]string) error
detailedPVCLabelsSetterOverride func(volCacheID, volumeClaim, namespace string) ([]metadata.ResourceOption, error)
}{
{
name: "Fails to get container metadata",
Expand Down Expand Up @@ -176,9 +178,9 @@ func TestMetadataErrorCases(t *testing.T) {
},
},
}, nil),
detailedPVCLabelsSetterOverride: func(volCacheID, volumeClaim, namespace string, labels map[string]string) error {
detailedPVCLabelsSetterOverride: func(volCacheID, volumeClaim, namespace string) ([]metadata.ResourceOption, error) {
// Mock failure cases.
return errors.New("")
return nil, errors.New("")
},
testScenario: func(acc metricDataAccumulator) {
podStats := stats.PodStats{
Expand All @@ -205,11 +207,16 @@ func TestMetadataErrorCases(t *testing.T) {
observedLogger, logs := observer.New(zapcore.WarnLevel)
logger := zap.New(observedLogger)

tt.metadata.DetailedPVCLabelsSetter = tt.detailedPVCLabelsSetterOverride
tt.metadata.DetailedPVCResourceGetter = tt.detailedPVCLabelsSetterOverride
acc := metricDataAccumulator{
metadata: tt.metadata,
logger: logger,
metricGroupsToCollect: tt.metricGroupsToCollect,
mbs: &metadata.MetricsBuilders{
WithNodeStartTime: metadata.NewMetricsBuilder(metadata.DefaultMetricsSettings()),
WithPodStartTime: metadata.NewMetricsBuilder(metadata.DefaultMetricsSettings()),
WithDefaultStartTime: metadata.NewMetricsBuilder(metadata.DefaultMetricsSettings()),
},
}

tt.testScenario(acc)
Expand All @@ -231,6 +238,11 @@ func TestNilHandling(t *testing.T) {
ContainerMetricGroup: true,
VolumeMetricGroup: true,
},
mbs: &metadata.MetricsBuilders{
WithNodeStartTime: metadata.NewMetricsBuilder(metadata.DefaultMetricsSettings()),
WithPodStartTime: metadata.NewMetricsBuilder(metadata.DefaultMetricsSettings()),
WithDefaultStartTime: metadata.NewMetricsBuilder(metadata.DefaultMetricsSettings()),
},
}
assert.NotPanics(t, func() {
acc.nodeStats(stats.NodeStats{})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,7 @@
package kubelet // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kubeletstatsreceiver/internal/kubelet"

const (
labelPersistentVolumeClaimName = "k8s.persistentvolumeclaim.name"
labelVolumeName = "k8s.volume.name"
labelVolumeType = "k8s.volume.type"
labelVolumeType = "k8s.volume.type"

// Volume types.
labelValuePersistentVolumeClaim = "persistentVolumeClaim"
Expand Down
15 changes: 7 additions & 8 deletions receiver/kubeletstatsreceiver/internal/kubelet/cpu.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,32 +16,31 @@ package kubelet // import "github.com/open-telemetry/opentelemetry-collector-con

import (
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
stats "k8s.io/kubelet/pkg/apis/stats/v1alpha1"

"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kubeletstatsreceiver/internal/metadata"
)

func addCPUMetrics(dest pmetric.MetricSlice, cpuMetrics metadata.CPUMetrics, s *stats.CPUStats, startTime pcommon.Timestamp, currentTime pcommon.Timestamp) {
func addCPUMetrics(mb *metadata.MetricsBuilder, cpuMetrics metadata.CPUMetrics, s *stats.CPUStats, currentTime pcommon.Timestamp) {
if s == nil {
return
}
addCPUUsageMetric(dest, cpuMetrics.Utilization, s, currentTime)
addCPUTimeMetric(dest, cpuMetrics.Time, s, startTime, currentTime)
addCPUUsageMetric(mb, cpuMetrics.Utilization, s, currentTime)
addCPUTimeMetric(mb, cpuMetrics.Time, s, currentTime)
}

func addCPUUsageMetric(dest pmetric.MetricSlice, metricInt metadata.MetricIntf, s *stats.CPUStats, currentTime pcommon.Timestamp) {
func addCPUUsageMetric(mb *metadata.MetricsBuilder, recordDataPoint metadata.RecordDoubleDataPointFunc, s *stats.CPUStats, currentTime pcommon.Timestamp) {
if s.UsageNanoCores == nil {
return
}
value := float64(*s.UsageNanoCores) / 1_000_000_000
fillDoubleGauge(dest.AppendEmpty(), metricInt, value, currentTime)
recordDataPoint(mb, currentTime, value)
}

func addCPUTimeMetric(dest pmetric.MetricSlice, metricInt metadata.MetricIntf, s *stats.CPUStats, startTime pcommon.Timestamp, currentTime pcommon.Timestamp) {
func addCPUTimeMetric(mb *metadata.MetricsBuilder, recordDataPoint metadata.RecordDoubleDataPointFunc, s *stats.CPUStats, currentTime pcommon.Timestamp) {
if s.UsageCoreNanoSeconds == nil {
return
}
value := float64(*s.UsageCoreNanoSeconds) / 1_000_000_000
fillDoubleSum(dest.AppendEmpty(), metricInt, value, startTime, currentTime)
recordDataPoint(mb, currentTime, value)
}
9 changes: 4 additions & 5 deletions receiver/kubeletstatsreceiver/internal/kubelet/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,17 @@ package kubelet // import "github.com/open-telemetry/opentelemetry-collector-con

import (
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
stats "k8s.io/kubelet/pkg/apis/stats/v1alpha1"

"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kubeletstatsreceiver/internal/metadata"
)

func addFilesystemMetrics(dest pmetric.MetricSlice, filesystemMetrics metadata.FilesystemMetrics, s *stats.FsStats, currentTime pcommon.Timestamp) {
func addFilesystemMetrics(mb *metadata.MetricsBuilder, filesystemMetrics metadata.FilesystemMetrics, s *stats.FsStats, currentTime pcommon.Timestamp) {
if s == nil {
return
}

addIntGauge(dest, filesystemMetrics.Available, s.AvailableBytes, currentTime)
addIntGauge(dest, filesystemMetrics.Capacity, s.CapacityBytes, currentTime)
addIntGauge(dest, filesystemMetrics.Usage, s.UsedBytes, currentTime)
recordIntDataPoint(mb, filesystemMetrics.Available, s.AvailableBytes, currentTime)
recordIntDataPoint(mb, filesystemMetrics.Capacity, s.CapacityBytes, currentTime)
recordIntDataPoint(mb, filesystemMetrics.Usage, s.UsedBytes, currentTime)
}
Loading