Skip to content

Commit

Permalink
replaced resource labels with ResourceOptions
Browse files Browse the repository at this point in the history
  • Loading branch information
TylerHelmuth committed May 11, 2022
1 parent 27e9adb commit d9b4910
Show file tree
Hide file tree
Showing 7 changed files with 162 additions and 151 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func TestMetadataErrorCases(t *testing.T) {
numMDs int
numLogs int
logMessages []string
detailedPVCLabelsSetterOverride func(volCacheID, volumeClaim, namespace string, labels map[string]string) error
detailedPVCLabelsSetterOverride func(volCacheID, volumeClaim, namespace string) ([]metadata.ResourceOption, error)
}{
{
name: "Fails to get container metadata",
Expand Down Expand Up @@ -178,9 +178,9 @@ func TestMetadataErrorCases(t *testing.T) {
},
},
}, nil),
detailedPVCLabelsSetterOverride: func(volCacheID, volumeClaim, namespace string, labels map[string]string) error {
detailedPVCLabelsSetterOverride: func(volCacheID, volumeClaim, namespace string) ([]metadata.ResourceOption, error) {
// Mock failure cases.
return errors.New("")
return nil, errors.New("")
},
testScenario: func(acc metricDataAccumulator) {
podStats := stats.PodStats{
Expand All @@ -207,7 +207,7 @@ func TestMetadataErrorCases(t *testing.T) {
observedLogger, logs := observer.New(zapcore.WarnLevel)
logger := zap.New(observedLogger)

tt.metadata.DetailedPVCLabelsSetter = tt.detailedPVCLabelsSetterOverride
tt.metadata.DetailedPVCResourceGetter = tt.detailedPVCLabelsSetterOverride
acc := metricDataAccumulator{
metadata: tt.metadata,
logger: logger,
Expand Down
44 changes: 23 additions & 21 deletions receiver/kubeletstatsreceiver/internal/kubelet/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package kubelet // import "github.com/open-telemetry/opentelemetry-collector-con
import (
"errors"
"fmt"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kubeletstatsreceiver/internal/metadata"
"regexp"

conventions "go.opentelemetry.io/collector/semconv/v1.6.1"
Expand Down Expand Up @@ -55,18 +56,18 @@ func ValidateMetadataLabelsConfig(labels []MetadataLabel) error {
}

type Metadata struct {
Labels map[MetadataLabel]bool
PodsMetadata *v1.PodList
DetailedPVCLabelsSetter func(volCacheID, volumeClaim, namespace string, labels map[string]string) error
Labels map[MetadataLabel]bool
PodsMetadata *v1.PodList
DetailedPVCResourceGetter func(volCacheID, volumeClaim, namespace string) ([]metadata.ResourceOption, error)
}

func NewMetadata(
labels []MetadataLabel, podsMetadata *v1.PodList,
detailedPVCLabelsSetter func(volCacheID, volumeClaim, namespace string, labels map[string]string) error) Metadata {
detailedPVCResourceGetter func(volCacheID, volumeClaim, namespace string) ([]metadata.ResourceOption, error)) Metadata {
return Metadata{
Labels: getLabelsMap(labels),
PodsMetadata: podsMetadata,
DetailedPVCLabelsSetter: detailedPVCLabelsSetter,
Labels: getLabelsMap(labels),
PodsMetadata: podsMetadata,
DetailedPVCResourceGetter: detailedPVCResourceGetter,
}
}

Expand All @@ -78,45 +79,46 @@ func getLabelsMap(metadataLabels []MetadataLabel) map[MetadataLabel]bool {
return out
}

// setExtraLabels sets extra labels in `labels` map based on provided metadata label.
func (m *Metadata) setExtraLabels(
labels map[string]string, podRef stats.PodReference,
extraMetadataLabel MetadataLabel, extraMetadataFrom string) error {
// getExtraResources gets extra resources based on provided metadata label.
func (m *Metadata) getExtraResources(podRef stats.PodReference, extraMetadataLabel MetadataLabel,
extraMetadataFrom string) ([]metadata.ResourceOption, error) {
// Ensure MetadataLabel exists before proceeding.
if !m.Labels[extraMetadataLabel] || len(m.Labels) == 0 {
return nil
return nil, nil
}

// Cannot proceed, if metadata is unavailable.
if m.PodsMetadata == nil {
return errors.New("pods metadata were not fetched")
return nil, errors.New("pods metadata were not fetched")
}

switch extraMetadataLabel {
case MetadataLabelContainerID:
containerID, err := m.getContainerID(podRef.UID, extraMetadataFrom)
if err != nil {
return err
return nil, err
}
labels[conventions.AttributeContainerID] = containerID
return []metadata.ResourceOption{metadata.WithContainerID(containerID)}, nil
case MetadataLabelVolumeType:
volume, err := m.getPodVolume(podRef.UID, extraMetadataFrom)
if err != nil {
return err
return nil, err
}

getLabelsFromVolume(volume, labels)
ro := getResourcesFromVolume(volume)

// Get more labels from PersistentVolumeClaim volume type.
if volume.PersistentVolumeClaim != nil {
volCacheID := fmt.Sprintf("%s/%s", podRef.UID, extraMetadataFrom)
if err := m.DetailedPVCLabelsSetter(volCacheID, labels[labelPersistentVolumeClaimName], podRef.Namespace,
labels); err != nil {
return fmt.Errorf("failed to set labels from volume claim: %w", err)
pvcResources, err := m.DetailedPVCResourceGetter(volCacheID, volume.PersistentVolumeClaim.ClaimName, podRef.Namespace)
if err != nil {
return nil, fmt.Errorf("failed to set labels from volume claim: %w", err)
}
ro = append(ro, pvcResources...)
}
return ro, nil
}
return nil
return nil, nil
}

// getContainerID retrieves container id from metadata for given pod UID and container name,
Expand Down
58 changes: 36 additions & 22 deletions receiver/kubeletstatsreceiver/internal/kubelet/metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,16 @@
package kubelet

import (
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kubeletstatsreceiver/internal/metadata"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
stats "k8s.io/kubelet/pkg/apis/stats/v1alpha1"

"go.opentelemetry.io/collector/pdata/pcommon"
)

func TestValidateMetadataLabelsConfig(t *testing.T) {
Expand Down Expand Up @@ -75,13 +78,13 @@ func TestSetExtraLabels(t *testing.T) {
metadata Metadata
args []string
wantError string
want map[string]string
want map[string]interface{}
}{
{
name: "no_labels",
metadata: NewMetadata([]MetadataLabel{}, nil, nil),
args: []string{"uid", "container.id", "container"},
want: map[string]string{},
want: map[string]interface{}{},
},
{
name: "set_container_id_valid",
Expand All @@ -103,7 +106,7 @@ func TestSetExtraLabels(t *testing.T) {
},
}, nil),
args: []string{"uid-1234", "container.id", "container1"},
want: map[string]string{
want: map[string]interface{}{
string(MetadataLabelContainerID): "test-container",
},
},
Expand Down Expand Up @@ -166,11 +169,17 @@ func TestSetExtraLabels(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
fields := map[string]string{}
err := tt.metadata.setExtraLabels(fields, stats.PodReference{UID: tt.args[0]}, MetadataLabel(tt.args[1]), tt.args[2])
ro, err := tt.metadata.getExtraResources(stats.PodReference{UID: tt.args[0]}, MetadataLabel(tt.args[1]), tt.args[2])

r := pcommon.NewResource()
for _, op := range ro {
op(r)
}

if tt.wantError == "" {
require.NoError(t, err)
assert.EqualValues(t, tt.want, fields)
temp := r.Attributes().AsRaw()
assert.EqualValues(t, tt.want, temp)
} else {
assert.Equal(t, tt.wantError, err.Error())
}
Expand All @@ -184,15 +193,15 @@ func TestSetExtraLabelsForVolumeTypes(t *testing.T) {
name string
vs v1.VolumeSource
args []string
want map[string]string
want map[string]interface{}
}{
{
name: "hostPath",
vs: v1.VolumeSource{
HostPath: &v1.HostPathVolumeSource{},
},
args: []string{"uid-1234", "k8s.volume.type"},
want: map[string]string{
want: map[string]interface{}{
"k8s.volume.type": "hostPath",
},
},
Expand All @@ -202,7 +211,7 @@ func TestSetExtraLabelsForVolumeTypes(t *testing.T) {
ConfigMap: &v1.ConfigMapVolumeSource{},
},
args: []string{"uid-1234", "k8s.volume.type"},
want: map[string]string{
want: map[string]interface{}{
"k8s.volume.type": "configMap",
},
},
Expand All @@ -212,7 +221,7 @@ func TestSetExtraLabelsForVolumeTypes(t *testing.T) {
EmptyDir: &v1.EmptyDirVolumeSource{},
},
args: []string{"uid-1234", "k8s.volume.type"},
want: map[string]string{
want: map[string]interface{}{
"k8s.volume.type": "emptyDir",
},
},
Expand All @@ -222,7 +231,7 @@ func TestSetExtraLabelsForVolumeTypes(t *testing.T) {
Secret: &v1.SecretVolumeSource{},
},
args: []string{"uid-1234", "k8s.volume.type"},
want: map[string]string{
want: map[string]interface{}{
"k8s.volume.type": "secret",
},
},
Expand All @@ -232,7 +241,7 @@ func TestSetExtraLabelsForVolumeTypes(t *testing.T) {
DownwardAPI: &v1.DownwardAPIVolumeSource{},
},
args: []string{"uid-1234", "k8s.volume.type"},
want: map[string]string{
want: map[string]interface{}{
"k8s.volume.type": "downwardAPI",
},
},
Expand All @@ -244,7 +253,7 @@ func TestSetExtraLabelsForVolumeTypes(t *testing.T) {
},
},
args: []string{"uid-1234", "k8s.volume.type"},
want: map[string]string{
want: map[string]interface{}{
"k8s.volume.type": "persistentVolumeClaim",
"k8s.persistentvolumeclaim.name": "claim-name",
},
Expand All @@ -259,7 +268,7 @@ func TestSetExtraLabelsForVolumeTypes(t *testing.T) {
},
},
args: []string{"uid-1234", "k8s.volume.type"},
want: map[string]string{
want: map[string]interface{}{
"k8s.volume.type": "awsElasticBlockStore",
"aws.volume.id": "volume_id",
"fs.type": "fs_type",
Expand All @@ -276,7 +285,7 @@ func TestSetExtraLabelsForVolumeTypes(t *testing.T) {
},
},
args: []string{"uid-1234", "k8s.volume.type"},
want: map[string]string{
want: map[string]interface{}{
"k8s.volume.type": "gcePersistentDisk",
"gce.pd.name": "pd_name",
"fs.type": "fs_type",
Expand All @@ -292,7 +301,7 @@ func TestSetExtraLabelsForVolumeTypes(t *testing.T) {
},
},
args: []string{"uid-1234", "k8s.volume.type"},
want: map[string]string{
want: map[string]interface{}{
"k8s.volume.type": "glusterfs",
"glusterfs.endpoints.name": "endspoints_name",
"glusterfs.path": "path",
Expand All @@ -302,12 +311,11 @@ func TestSetExtraLabelsForVolumeTypes(t *testing.T) {
name: "unsupported type",
vs: v1.VolumeSource{},
args: []string{"uid-1234", "k8s.volume.type"},
want: map[string]string{},
want: map[string]interface{}{},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
fields := map[string]string{}
volName := "volume0"
metadata := NewMetadata([]MetadataLabel{MetadataLabelVolumeType}, &v1.PodList{
Items: []v1.Pod{
Expand All @@ -325,11 +333,17 @@ func TestSetExtraLabelsForVolumeTypes(t *testing.T) {
},
},
},
}, func(volCacheID, volumeClaim, namespace string, labels map[string]string) error {
return nil
}, func(volCacheID, volumeClaim, namespace string) ([]metadata.ResourceOption, error) {
return nil, nil
})
metadata.setExtraLabels(fields, stats.PodReference{UID: tt.args[0]}, MetadataLabel(tt.args[1]), volName)
assert.Equal(t, tt.want, fields)
ro, _ := metadata.getExtraResources(stats.PodReference{UID: tt.args[0]}, MetadataLabel(tt.args[1]), volName)

r := pcommon.NewResource()
for _, op := range ro {
op(r)
}

assert.Equal(t, tt.want, r.Attributes().AsRaw())
})
}
}
63 changes: 27 additions & 36 deletions receiver/kubeletstatsreceiver/internal/kubelet/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,52 +16,43 @@ package kubelet // import "github.com/open-telemetry/opentelemetry-collector-con

import (
"fmt"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kubeletstatsreceiver/internal/metadata"

"go.opentelemetry.io/collector/pdata/pcommon"
conventions "go.opentelemetry.io/collector/semconv/v1.6.1"
stats "k8s.io/kubelet/pkg/apis/stats/v1alpha1"
)

func fillNodeResource(dest pcommon.Resource, s stats.NodeStats) {
dest.Attributes().UpsertString(conventions.AttributeK8SNodeName, s.NodeName)
}

func fillPodResource(dest pcommon.Resource, s stats.PodStats) {
dest.Attributes().UpsertString(conventions.AttributeK8SPodUID, s.PodRef.UID)
dest.Attributes().UpsertString(conventions.AttributeK8SPodName, s.PodRef.Name)
dest.Attributes().UpsertString(conventions.AttributeK8SNamespaceName, s.PodRef.Namespace)
}

func fillContainerResource(dest pcommon.Resource, sPod stats.PodStats, sContainer stats.ContainerStats, metadata Metadata) error {
labels := map[string]string{
conventions.AttributeK8SPodUID: sPod.PodRef.UID,
conventions.AttributeK8SPodName: sPod.PodRef.Name,
conventions.AttributeK8SNamespaceName: sPod.PodRef.Namespace,
conventions.AttributeK8SContainerName: sContainer.Name,
}
if err := metadata.setExtraLabels(labels, sPod.PodRef, MetadataLabelContainerID, sContainer.Name); err != nil {
return fmt.Errorf("failed to set extra labels from metadata: %w", err)
func getContainerResourceOptions(sPod stats.PodStats, sContainer stats.ContainerStats, k8sMetadata Metadata) ([]metadata.ResourceOption, error) {
ro := []metadata.ResourceOption{
metadata.WithK8sPodUID(sPod.PodRef.UID),
metadata.WithK8sPodName(sPod.PodRef.Name),
metadata.WithK8sNamespaceName(sPod.PodRef.Namespace),
metadata.WithContainerName(sContainer.Name),
}
for k, v := range labels {
dest.Attributes().UpsertString(k, v)

extraResources, err := k8sMetadata.getExtraResources(sPod.PodRef, MetadataLabelContainerID, sContainer.Name)
if err != nil {
return nil, fmt.Errorf("failed to set extra labels from metadata: %w", err)
}
return nil

ro = append(ro, extraResources...)

return ro, nil
}

func fillVolumeResource(dest pcommon.Resource, sPod stats.PodStats, vs stats.VolumeStats, metadata Metadata) error {
labels := map[string]string{
conventions.AttributeK8SPodUID: sPod.PodRef.UID,
conventions.AttributeK8SPodName: sPod.PodRef.Name,
conventions.AttributeK8SNamespaceName: sPod.PodRef.Namespace,
labelVolumeName: vs.Name,
func getVolumeResourceOptions(sPod stats.PodStats, vs stats.VolumeStats, k8sMetadata Metadata) ([]metadata.ResourceOption, error) {
ro := []metadata.ResourceOption{
metadata.WithK8sPodUID(sPod.PodRef.UID),
metadata.WithK8sPodName(sPod.PodRef.Name),
metadata.WithK8sNamespaceName(sPod.PodRef.Namespace),
metadata.WithK8sVolumeName(vs.Name),
}

if err := metadata.setExtraLabels(labels, sPod.PodRef, MetadataLabelVolumeType, vs.Name); err != nil {
return fmt.Errorf("failed to set extra labels from metadata: %w", err)
extraResources, err := k8sMetadata.getExtraResources(sPod.PodRef, MetadataLabelVolumeType, vs.Name)
if err != nil {
return nil, fmt.Errorf("failed to set extra labels from metadata: %w", err)
}

for k, v := range labels {
dest.Attributes().UpsertString(k, v)
}
return nil
ro = append(ro, extraResources...)

return ro, nil
}
Loading

0 comments on commit d9b4910

Please sign in to comment.