From 9c4df9e5447d2d89187315ffd63f379abf120d50 Mon Sep 17 00:00:00 2001 From: Chad Patel Date: Mon, 22 May 2023 17:00:15 -0500 Subject: [PATCH 01/17] add 6 more k8sapiserver metrics to container insights plugin --- .../k8sapiserver/prometheus_consumer.go | 100 +++++++++++++----- .../k8sapiserver/prometheus_consumer_test.go | 31 +++++- 2 files changed, 104 insertions(+), 27 deletions(-) diff --git a/receiver/awscontainerinsightreceiver/internal/k8sapiserver/prometheus_consumer.go b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/prometheus_consumer.go index 0606b3a2ccc4..e0e159c23e7c 100644 --- a/receiver/awscontainerinsightreceiver/internal/k8sapiserver/prometheus_consumer.go +++ b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/prometheus_consumer.go @@ -16,27 +16,51 @@ package k8sapiserver // import "github.com/open-telemetry/opentelemetry-collecto import ( "context" - "strconv" - "time" + "fmt" "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" "go.uber.org/zap" ) +const ( + controlPlaneResourceType = "control_plane" +) + +var ( + defaultResourceToType = map[string]string{ + controlPlaneResourceType: "Cluster", + } + defaultMetricsToResource = map[string]string{ + "apiserver_storage_objects": controlPlaneResourceType, + "apiserver_request_total": controlPlaneResourceType, + "apiserver_request_duration_seconds": controlPlaneResourceType, + "apiserver_admission_controller_admission_duration_seconds": controlPlaneResourceType, + "rest_client_request_duration_seconds": controlPlaneResourceType, + "etcd_request_duration_seconds": controlPlaneResourceType, + "etcd_db_total_size_in_bytes": controlPlaneResourceType, + } +) + type prometheusConsumer struct { - nextConsumer consumer.Metrics - logger *zap.Logger - clusterName string - nodeName string + nextConsumer consumer.Metrics + logger *zap.Logger + clusterName string + nodeName string + resource pcommon.Resource + resourcesToType map[string]string + metricsToResource map[string]string } func newPrometheusConsumer(logger *zap.Logger, nextConsumer consumer.Metrics, clusterName string, nodeName string) prometheusConsumer { return prometheusConsumer{ - logger: logger, - nextConsumer: nextConsumer, - clusterName: clusterName, - nodeName: nodeName, + logger: logger, + nextConsumer: nextConsumer, + clusterName: clusterName, + nodeName: nodeName, + resourcesToType: defaultResourceToType, + metricsToResource: defaultMetricsToResource, } } func (c prometheusConsumer) Capabilities() consumer.Capabilities { @@ -45,24 +69,52 @@ func (c prometheusConsumer) Capabilities() consumer.Capabilities { } } -func (c prometheusConsumer) ConsumeMetrics(ctx context.Context, ld pmetric.Metrics) error { - rms := ld.ResourceMetrics() +func (c prometheusConsumer) ConsumeMetrics(ctx context.Context, originalMetrics pmetric.Metrics) error { - for i := 0; i < rms.Len(); i++ { + localScopeMetrics := map[string]pmetric.ScopeMetrics{} + newMetrics := pmetric.NewMetrics() - rm := rms.At(i) - timestampNs := strconv.FormatInt(time.Now().UnixNano(), 10) + for key, value := range c.resourcesToType { + newResourceMetrics := newMetrics.ResourceMetrics().AppendEmpty() + // common attributes + newResourceMetrics.Resource().Attributes().PutStr("ClusterName", c.clusterName) + newResourceMetrics.Resource().Attributes().PutStr("Version", "0") + newResourceMetrics.Resource().Attributes().PutStr("Sources", "[\"apiserver\"]") + newResourceMetrics.Resource().Attributes().PutStr("NodeName", c.nodeName) - rm.Resource().Attributes().PutStr("ClusterName", c.clusterName) - rm.Resource().Attributes().PutStr("Type", "Cluster") - rm.Resource().Attributes().PutStr("Timestamp", timestampNs) - rm.Resource().Attributes().PutStr("Version", "0") - rm.Resource().Attributes().PutStr("Sources", "[\"apiserver\"]") - rm.Resource().Attributes().PutStr("NodeName", c.nodeName) + // resource-specific type metric + newResourceMetrics.Resource().Attributes().PutStr("Type", value) - // TODO: need to separate out metrics by type (cluster, service, etc) + newScopeMetrics := newResourceMetrics.ScopeMetrics().AppendEmpty() + localScopeMetrics[key] = newScopeMetrics } - // forward on the metrics - return c.nextConsumer.ConsumeMetrics(ctx, ld) + rms := originalMetrics.ResourceMetrics() + for i := 0; i < rms.Len(); i++ { + scopeMetrics := rms.At(i).ScopeMetrics() + for j := 0; j < scopeMetrics.Len(); j++ { + scopeMetric := scopeMetrics.At(j) + for k := 0; k < scopeMetric.Metrics().Len(); k++ { + metric := scopeMetric.Metrics().At(k) + // check control plane metrics + resourceName, ok := c.metricsToResource[metric.Name()] + if !ok { + continue + } + resourceSpecificScopeMetrics, ok := localScopeMetrics[resourceName] + if !ok { + continue + } + c.logger.Debug(fmt.Sprintf("Copying metric %s into resource %s", metric.Name(), resourceName)) + metric.CopyTo(resourceSpecificScopeMetrics.Metrics().AppendEmpty()) + } + } + } + + c.logger.Info("Forwarding on k8sapiserver prometheus metrics", + zap.Int("MetricCount", newMetrics.MetricCount()), + zap.Int("DataPointCount", newMetrics.DataPointCount())) + + // forward on the new metrics + return c.nextConsumer.ConsumeMetrics(ctx, newMetrics) } diff --git a/receiver/awscontainerinsightreceiver/internal/k8sapiserver/prometheus_consumer_test.go b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/prometheus_consumer_test.go index 3eae3d1ce007..b5ad24e4535e 100644 --- a/receiver/awscontainerinsightreceiver/internal/k8sapiserver/prometheus_consumer_test.go +++ b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/prometheus_consumer_test.go @@ -55,8 +55,8 @@ func (m mockNextConsumer) ConsumeMetrics(ctx context.Context, md pmetric.Metrics assert.NotEmpty(m.t, value.Str()) value, found = md.ResourceMetrics().At(0).Resource().Attributes().Get("Timestamp") - assert.True(m.t, found) - assert.NotEmpty(m.t, value.Str()) + assert.False(m.t, found) + assert.Empty(m.t, value.Str()) value, found = md.ResourceMetrics().At(0).Resource().Attributes().Get("Sources") assert.True(m.t, found) @@ -66,6 +66,15 @@ func (m mockNextConsumer) ConsumeMetrics(ctx context.Context, md pmetric.Metrics assert.True(m.t, found) assert.Equal(m.t, "test-node", value.Str()) + assert.Equal(m.t, len(defaultResourceToType), md.ResourceMetrics().Len()) + assert.Equal(m.t, 1, md.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().Len()) + + metric1 := md.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0) + assert.Equal(m.t, "apiserver_storage_objects", metric1.Name()) + assert.Equal(m.t, 123.4, metric1.Gauge().DataPoints().At(0).DoubleValue()) + + assert.Equal(m.t, 1, md.MetricCount()) + return nil } @@ -76,12 +85,28 @@ func TestPrometheusConsumeMetrics(t *testing.T) { } consumer := newPrometheusConsumer(zap.NewNop(), nextConsumer, "test-cluster", "test-node") + consumer.metricsToResource["invalid-metrics-to-resource"] = "invalid" cap := consumer.Capabilities() assert.True(t, cap.MutatesData) metrics := pmetric.NewMetrics() - metrics.ResourceMetrics().AppendEmpty() + metrics.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics().AppendEmpty() + metric1 := metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0) + metric1.SetName("apiserver_storage_objects") + metric1.SetEmptyGauge().DataPoints().AppendEmpty().SetDoubleValue(123.4) + + metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().AppendEmpty() + metric2 := metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(1) + metric2.SetName("some_excluded_metric") + metric2.SetEmptyGauge().DataPoints().AppendEmpty().SetDoubleValue(456.7) + + metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().AppendEmpty() + metric3 := metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(1) + metric3.SetName("invalid-metrics-to-resource") + metric3.SetEmptyGauge().DataPoints().AppendEmpty().SetDoubleValue(99.9) + + assert.Equal(t, 3, metrics.MetricCount()) result := consumer.ConsumeMetrics(context.TODO(), metrics) assert.NoError(t, result) From f8ffd079b3e60e333eb16d8ee9de951a1389dd5d Mon Sep 17 00:00:00 2001 From: Chad Patel Date: Wed, 24 May 2023 15:27:00 -0500 Subject: [PATCH 02/17] add license to missing files. Minor tweaks --- .../internal/k8sapiserver/prometheus_consumer.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/receiver/awscontainerinsightreceiver/internal/k8sapiserver/prometheus_consumer.go b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/prometheus_consumer.go index e0e159c23e7c..034fd5573a00 100644 --- a/receiver/awscontainerinsightreceiver/internal/k8sapiserver/prometheus_consumer.go +++ b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/prometheus_consumer.go @@ -38,6 +38,7 @@ var ( "apiserver_request_duration_seconds": controlPlaneResourceType, "apiserver_admission_controller_admission_duration_seconds": controlPlaneResourceType, "rest_client_request_duration_seconds": controlPlaneResourceType, + "rest_client_requests_total": controlPlaneResourceType, "etcd_request_duration_seconds": controlPlaneResourceType, "etcd_db_total_size_in_bytes": controlPlaneResourceType, } @@ -53,8 +54,8 @@ type prometheusConsumer struct { metricsToResource map[string]string } -func newPrometheusConsumer(logger *zap.Logger, nextConsumer consumer.Metrics, clusterName string, nodeName string) prometheusConsumer { - return prometheusConsumer{ +func newPrometheusConsumer(logger *zap.Logger, nextConsumer consumer.Metrics, clusterName string, nodeName string) *prometheusConsumer { + return &prometheusConsumer{ logger: logger, nextConsumer: nextConsumer, clusterName: clusterName, From 721c33e1cdcc680596952fdbf74f6c830d114951 Mon Sep 17 00:00:00 2001 From: Chad Patel Date: Wed, 24 May 2023 17:20:20 -0500 Subject: [PATCH 03/17] fix linter errors --- .../k8sapiserver/prometheus_consumer.go | 2 -- .../k8sapiserver/prometheus_scraper.go | 19 +++++-------------- .../k8sapiserver/prometheus_scraper_test.go | 4 ++-- .../awscontainerinsightreceiver/receiver.go | 2 +- 4 files changed, 8 insertions(+), 19 deletions(-) diff --git a/receiver/awscontainerinsightreceiver/internal/k8sapiserver/prometheus_consumer.go b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/prometheus_consumer.go index 034fd5573a00..40b3bdfbb765 100644 --- a/receiver/awscontainerinsightreceiver/internal/k8sapiserver/prometheus_consumer.go +++ b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/prometheus_consumer.go @@ -19,7 +19,6 @@ import ( "fmt" "go.opentelemetry.io/collector/consumer" - "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" "go.uber.org/zap" ) @@ -49,7 +48,6 @@ type prometheusConsumer struct { logger *zap.Logger clusterName string nodeName string - resource pcommon.Resource resourcesToType map[string]string metricsToResource map[string]string } diff --git a/receiver/awscontainerinsightreceiver/internal/k8sapiserver/prometheus_scraper.go b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/prometheus_scraper.go index 314db529abb6..2b7e7b6a7f4f 100644 --- a/receiver/awscontainerinsightreceiver/internal/k8sapiserver/prometheus_scraper.go +++ b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/prometheus_scraper.go @@ -55,10 +55,6 @@ func NewPrometheusScraper(ctx context.Context, telemetrySettings component.Telem // get endpoint endpoint := k8sClient.GetClientSet().CoreV1().RESTClient().Get().AbsPath("/").URL().Hostname() - // TODO: test RBAC permissions? - - spFactory := simpleprometheusreceiver.NewFactory() // TODO: pass this in? - spConfig := simpleprometheusreceiver.Config{ HTTPClientSettings: confighttp.HTTPClientSettings{ Endpoint: endpoint, @@ -81,6 +77,7 @@ func NewPrometheusScraper(ctx context.Context, telemetrySettings component.Telem TelemetrySettings: telemetrySettings, } + spFactory := simpleprometheusreceiver.NewFactory() spr, err := spFactory.CreateMetricsReceiver(ctx, params, &spConfig, consumer) if err != nil { return nil, fmt.Errorf("failed to create simple prometheus receiver: %w", err) @@ -95,16 +92,10 @@ func NewPrometheusScraper(ctx context.Context, telemetrySettings component.Telem }, nil } -func (ps *PrometheusScraper) Start() { - err := ps.simplePrometheusReceiver.Start(ps.ctx, ps.host) - if err != nil { - panic("fix me when productionalizing code") - } +func (ps *PrometheusScraper) Start() error { + return ps.simplePrometheusReceiver.Start(ps.ctx, ps.host) } -func (ps *PrometheusScraper) Shutdown() { - err := ps.simplePrometheusReceiver.Shutdown(ps.ctx) - if err != nil { - panic("fix me when productionalizing code") - } +func (ps *PrometheusScraper) Shutdown() error { + return ps.simplePrometheusReceiver.Shutdown(ps.ctx) } diff --git a/receiver/awscontainerinsightreceiver/internal/k8sapiserver/prometheus_scraper_test.go b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/prometheus_scraper_test.go index f63c2705591e..bc729e499335 100644 --- a/receiver/awscontainerinsightreceiver/internal/k8sapiserver/prometheus_scraper_test.go +++ b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/prometheus_scraper_test.go @@ -135,10 +135,10 @@ func TestNewPrometheusScraperEndToEnd(t *testing.T) { assert.NotNil(t, mp) defer mp.Close() - scraper.Start() + assert.NoError(t, scraper.Start()) t.Cleanup(func() { - scraper.Shutdown() + assert.NoError(t, scraper.Shutdown()) }) // wait for scrape diff --git a/receiver/awscontainerinsightreceiver/receiver.go b/receiver/awscontainerinsightreceiver/receiver.go index ab8e9999c580..d660d5bdab92 100644 --- a/receiver/awscontainerinsightreceiver/receiver.go +++ b/receiver/awscontainerinsightreceiver/receiver.go @@ -143,7 +143,7 @@ func (acir *awsContainerInsightReceiver) Start(ctx context.Context, host compone // Shutdown stops the awsContainerInsightReceiver receiver. func (acir *awsContainerInsightReceiver) Shutdown(context.Context) error { if acir.prometheusScraper != nil { - acir.prometheusScraper.Shutdown() + acir.prometheusScraper.Shutdown() //nolint:errcheck } if acir.cancel == nil { From 280b69d26b17a50442e0f2419029bf93916143b0 Mon Sep 17 00:00:00 2001 From: Chad Patel Date: Tue, 30 May 2023 15:23:30 -0500 Subject: [PATCH 04/17] refactor leader election to be more common so multiple scrapers can utilize it --- .../awscontainerinsightreceiver/README.md | 2 + .../internal/k8sapiserver/k8sapiserver.go | 249 ++--------------- .../k8sapiserver/k8sapiserver_test.go | 60 ++--- .../internal/k8sapiserver/leaderelection.go | 253 ++++++++++++++++++ .../k8sapiserver/leaderelection_test.go | 84 ++++++ .../k8sapiserver/prometheus_scraper.go | 49 ++-- .../k8sapiserver/prometheus_scraper_test.go | 15 +- .../internal/leaderelection/configmaplock.go | 106 ++++++++ .../awscontainerinsightreceiver/receiver.go | 38 ++- 9 files changed, 558 insertions(+), 298 deletions(-) create mode 100644 receiver/awscontainerinsightreceiver/internal/k8sapiserver/leaderelection.go create mode 100644 receiver/awscontainerinsightreceiver/internal/k8sapiserver/leaderelection_test.go create mode 100644 receiver/awscontainerinsightreceiver/internal/leaderelection/configmaplock.go diff --git a/receiver/awscontainerinsightreceiver/README.md b/receiver/awscontainerinsightreceiver/README.md index e8cb9965c612..8e91b0226e0d 100644 --- a/receiver/awscontainerinsightreceiver/README.md +++ b/receiver/awscontainerinsightreceiver/README.md @@ -112,6 +112,8 @@ rules: resources: ["configmaps"] resourceNames: ["otel-container-insight-clusterleader"] verbs: ["get","update"] + - nonResourceURLs: ["/metrics"] + verbs: ["get", "list", "watch"] --- kind: ClusterRoleBinding diff --git a/receiver/awscontainerinsightreceiver/internal/k8sapiserver/k8sapiserver.go b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/k8sapiserver.go index ae1ba132ffc3..618caaa3016c 100644 --- a/receiver/awscontainerinsightreceiver/internal/k8sapiserver/k8sapiserver.go +++ b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/k8sapiserver.go @@ -20,76 +20,21 @@ import ( "fmt" "os" "strconv" - "sync" "time" "go.opentelemetry.io/collector/pdata/pmetric" "go.uber.org/zap" - v1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/watch" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/kubernetes/scheme" - corev1 "k8s.io/client-go/kubernetes/typed/core/v1" - "k8s.io/client-go/tools/leaderelection" - "k8s.io/client-go/tools/leaderelection/resourcelock" - "k8s.io/client-go/tools/record" - "k8s.io/klog" ci "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/containerinsight" - "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/k8s/k8sclient" ) -// eventBroadcaster is adpated from record.EventBroadcaster -type eventBroadcaster interface { - // StartRecordingToSink starts sending events received from this EventBroadcaster to the given - // sink. The return value can be ignored or used to stop recording, if desired. - StartRecordingToSink(sink record.EventSink) watch.Interface - // StartLogging starts sending events received from this EventBroadcaster to the given logging - // function. The return value can be ignored or used to stop recording, if desired. - StartLogging(logf func(format string, args ...interface{})) watch.Interface - // NewRecorder returns an EventRecorder that can be used to send events to this EventBroadcaster - // with the event source set to the given event source. - NewRecorder(scheme *runtime.Scheme, source v1.EventSource) record.EventRecorder -} - -type K8sClient interface { - GetClientSet() kubernetes.Interface - GetEpClient() k8sclient.EpClient - GetNodeClient() k8sclient.NodeClient - GetPodClient() k8sclient.PodClient - GetDeploymentClient() k8sclient.DeploymentClient - GetDaemonSetClient() k8sclient.DaemonSetClient - ShutdownNodeClient() - ShutdownPodClient() - ShutdownDeploymentClient() - ShutdownDaemonSetClient() -} - // K8sAPIServer is a struct that produces metrics from kubernetes api server type K8sAPIServer struct { nodeName string // get the value from downward API logger *zap.Logger clusterNameProvider clusterNameProvider cancel context.CancelFunc - - mu sync.Mutex - leading bool - leaderLockName string - leaderLockUsingConfigMapOnly bool - - k8sClient K8sClient // *k8sclient.K8sClient - epClient k8sclient.EpClient - nodeClient k8sclient.NodeClient - podClient k8sclient.PodClient - deploymentClient k8sclient.DeploymentClient - daemonSetClient k8sclient.DaemonSetClient - - // the following can be set to mocks in testing - broadcaster eventBroadcaster - // the close of isLeadingC indicates the leader election is done. This is used in testing - isLeadingC chan bool + leaderElection *LeaderElection } type clusterNameProvider interface { @@ -98,37 +43,28 @@ type clusterNameProvider interface { type Option func(*K8sAPIServer) -func WithLeaderLockName(name string) Option { - return func(server *K8sAPIServer) { - server.leaderLockName = name - } -} - -func WithLeaderLockUsingConfigMapOnly(leaderLockUsingConfigMapOnly bool) Option { - return func(server *K8sAPIServer) { - server.leaderLockUsingConfigMapOnly = leaderLockUsingConfigMapOnly - } -} +// NewK8sAPIServer creates a k8sApiServer which can generate cluster-level metrics +func NewK8sAPIServer(cnp clusterNameProvider, logger *zap.Logger, leaderElection *LeaderElection, options ...Option) (*K8sAPIServer, error) { -// New creates a k8sApiServer which can generate cluster-level metrics -func New(clusterNameProvider clusterNameProvider, logger *zap.Logger, options ...Option) (*K8sAPIServer, error) { k := &K8sAPIServer{ logger: logger, - clusterNameProvider: clusterNameProvider, - k8sClient: k8sclient.Get(logger), - broadcaster: record.NewBroadcaster(), + clusterNameProvider: cnp, + leaderElection: leaderElection, } for _, opt := range options { opt(k) } - if k.k8sClient == nil { - return nil, errors.New("failed to start k8sapiserver because k8sclient is nil") + if k.leaderElection == nil { + return nil, errors.New("cannot start k8sapiserver, leader election is nil") } - if err := k.init(); err != nil { - return nil, fmt.Errorf("fail to initialize k8sapiserver, err: %w", err) + _, k.cancel = context.WithCancel(context.Background()) + + k.nodeName = os.Getenv("HOST_NAME") + if k.nodeName == "" { + return nil, errors.New("environment variable HOST_NAME is not set in k8s deployment config") } return k, nil @@ -139,9 +75,7 @@ func (k *K8sAPIServer) GetMetrics() []pmetric.Metrics { var result []pmetric.Metrics // don't generate any metrics if the current collector is not the leader - k.mu.Lock() - defer k.mu.Unlock() - if !k.leading { + if !k.leaderElection.leading { return result } @@ -166,8 +100,8 @@ func (k *K8sAPIServer) GetMetrics() []pmetric.Metrics { func (k *K8sAPIServer) getClusterMetrics(clusterName, timestampNs string) pmetric.Metrics { fields := map[string]interface{}{ - "cluster_failed_node_count": k.nodeClient.ClusterFailedNodeCount(), - "cluster_node_count": k.nodeClient.ClusterNodeCount(), + "cluster_failed_node_count": k.leaderElection.nodeClient.ClusterFailedNodeCount(), + "cluster_node_count": k.leaderElection.nodeClient.ClusterNodeCount(), } attributes := map[string]string{ ci.ClusterNameKey: clusterName, @@ -184,7 +118,7 @@ func (k *K8sAPIServer) getClusterMetrics(clusterName, timestampNs string) pmetri func (k *K8sAPIServer) getNamespaceMetrics(clusterName, timestampNs string) []pmetric.Metrics { var metrics []pmetric.Metrics - for namespace, podNum := range k.podClient.NamespaceToRunningPodNum() { + for namespace, podNum := range k.leaderElection.podClient.NamespaceToRunningPodNum() { fields := map[string]interface{}{ "namespace_number_of_running_pods": podNum, } @@ -208,7 +142,7 @@ func (k *K8sAPIServer) getNamespaceMetrics(clusterName, timestampNs string) []pm func (k *K8sAPIServer) getDeploymentMetrics(clusterName, timestampNs string) []pmetric.Metrics { var metrics []pmetric.Metrics - deployments := k.deploymentClient.DeploymentInfos() + deployments := k.leaderElection.deploymentClient.DeploymentInfos() for _, deployment := range deployments { fields := map[string]interface{}{ ci.MetricName(ci.TypeClusterDeployment, ci.SpecReplicas): deployment.Spec.Replicas, // deployment_spec_replicas @@ -238,7 +172,7 @@ func (k *K8sAPIServer) getDeploymentMetrics(clusterName, timestampNs string) []p func (k *K8sAPIServer) getDaemonSetMetrics(clusterName, timestampNs string) []pmetric.Metrics { var metrics []pmetric.Metrics - daemonSets := k.daemonSetClient.DaemonSetInfos() + daemonSets := k.leaderElection.daemonSetClient.DaemonSetInfos() for _, daemonSet := range daemonSets { fields := map[string]interface{}{ ci.MetricName(ci.TypeClusterDaemonSet, ci.StatusNumberAvailable): daemonSet.Status.NumberAvailable, // daemonset_status_number_available @@ -268,7 +202,7 @@ func (k *K8sAPIServer) getDaemonSetMetrics(clusterName, timestampNs string) []pm func (k *K8sAPIServer) getServiceMetrics(clusterName, timestampNs string) []pmetric.Metrics { var metrics []pmetric.Metrics - for service, podNum := range k.epClient.ServiceToPodNum() { + for service, podNum := range k.leaderElection.epClient.ServiceToPodNum() { fields := map[string]interface{}{ "service_number_of_running_pods": podNum, } @@ -292,154 +226,9 @@ func (k *K8sAPIServer) getServiceMetrics(clusterName, timestampNs string) []pmet return metrics } -func (k *K8sAPIServer) init() error { - var ctx context.Context - ctx, k.cancel = context.WithCancel(context.Background()) - - k.nodeName = os.Getenv("HOST_NAME") - if k.nodeName == "" { - return errors.New("environment variable HOST_NAME is not set in k8s deployment config") - } - - lockNamespace := os.Getenv("K8S_NAMESPACE") - if lockNamespace == "" { - return errors.New("environment variable K8S_NAMESPACE is not set in k8s deployment config") - } - - resourceLockConfig := resourcelock.ResourceLockConfig{ - Identity: k.nodeName, - EventRecorder: k.createRecorder(k.leaderLockName, lockNamespace), - } - - clientSet := k.k8sClient.GetClientSet() - configMapInterface := clientSet.CoreV1().ConfigMaps(lockNamespace) - if configMap, err := configMapInterface.Get(ctx, k.leaderLockName, metav1.GetOptions{}); configMap == nil || err != nil { - k.logger.Info(fmt.Sprintf("Cannot get the leader config map: %v, try to create the config map...", err)) - configMap, err = configMapInterface.Create(ctx, - &v1.ConfigMap{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: lockNamespace, - Name: k.leaderLockName, - }, - }, metav1.CreateOptions{}) - k.logger.Info(fmt.Sprintf("configMap: %v, err: %v", configMap, err)) - } - - var lock resourcelock.Interface - if k.leaderLockUsingConfigMapOnly { - lock = &ConfigMapLock{ - ConfigMapMeta: metav1.ObjectMeta{ - Namespace: lockNamespace, - Name: k.leaderLockName, - }, - Client: clientSet.CoreV1(), - LockConfig: resourceLockConfig, - } - } else { - l, err := resourcelock.New( - resourcelock.ConfigMapsLeasesResourceLock, - lockNamespace, k.leaderLockName, - clientSet.CoreV1(), - clientSet.CoordinationV1(), - resourceLockConfig) - if err != nil { - k.logger.Warn("Failed to create resource lock", zap.Error(err)) - return err - } - lock = l - } - - go k.startLeaderElection(ctx, lock) - - return nil -} - // Shutdown stops the k8sApiServer func (k *K8sAPIServer) Shutdown() { if k.cancel != nil { k.cancel() } } - -func (k *K8sAPIServer) startLeaderElection(ctx context.Context, lock resourcelock.Interface) { - - for { - leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{ - Lock: lock, - // IMPORTANT: you MUST ensure that any code you have that - // is protected by the lease must terminate **before** - // you call cancel. Otherwise, you could have a background - // loop still running and another process could - // get elected before your background loop finished, violating - // the stated goal of the lease. - LeaseDuration: 60 * time.Second, - RenewDeadline: 15 * time.Second, - RetryPeriod: 5 * time.Second, - Callbacks: leaderelection.LeaderCallbacks{ - OnStartedLeading: func(ctx context.Context) { - k.logger.Info(fmt.Sprintf("k8sapiserver OnStartedLeading: %s", k.nodeName)) - // we're notified when we start - k.mu.Lock() - k.leading = true - // always retrieve clients in case previous ones shut down during leader switching - k.nodeClient = k.k8sClient.GetNodeClient() - k.podClient = k.k8sClient.GetPodClient() - k.epClient = k.k8sClient.GetEpClient() - k.deploymentClient = k.k8sClient.GetDeploymentClient() - k.daemonSetClient = k.k8sClient.GetDaemonSetClient() - k.mu.Unlock() - - if k.isLeadingC != nil { - // this executes only in testing - close(k.isLeadingC) - } - - for { - k.mu.Lock() - leading := k.leading - k.mu.Unlock() - if !leading { - k.logger.Info("no longer leading") - return - } - select { - case <-ctx.Done(): - k.logger.Info("ctx cancelled") - return - case <-time.After(time.Second): - } - } - }, - OnStoppedLeading: func() { - k.logger.Info(fmt.Sprintf("k8sapiserver OnStoppedLeading: %s", k.nodeName)) - // we can do cleanup here, or after the RunOrDie method returns - k.mu.Lock() - defer k.mu.Unlock() - k.leading = false - // The following are only used for cluster level metrics, whereas endpoint is used for decorator too. - k.k8sClient.ShutdownNodeClient() - k.k8sClient.ShutdownPodClient() - k.k8sClient.ShutdownDeploymentClient() - k.k8sClient.ShutdownDaemonSetClient() - }, - OnNewLeader: func(identity string) { - k.logger.Info(fmt.Sprintf("k8sapiserver Switch New Leader: %s", identity)) - }, - }, - }) - - select { - case <-ctx.Done(): // when leader election ends, the channel ctx.Done() will be closed - k.logger.Info(fmt.Sprintf("k8sapiserver shutdown Leader Election: %s", k.nodeName)) - return - default: - } - } -} - -func (k *K8sAPIServer) createRecorder(name, namespace string) record.EventRecorder { - k.broadcaster.StartLogging(klog.Infof) - clientSet := k.k8sClient.GetClientSet() - k.broadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: corev1.New(clientSet.CoreV1().RESTClient()).Events(namespace)}) - return k.broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: name}) -} diff --git a/receiver/awscontainerinsightreceiver/internal/k8sapiserver/k8sapiserver_test.go b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/k8sapiserver_test.go index ac6a43f2d28b..847aa1ec6bcd 100644 --- a/receiver/awscontainerinsightreceiver/internal/k8sapiserver/k8sapiserver_test.go +++ b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/k8sapiserver_test.go @@ -17,7 +17,6 @@ package k8sapiserver import ( "fmt" "os" - "strings" "testing" "github.com/stretchr/testify/assert" @@ -189,10 +188,7 @@ func (m mockClusterNameProvider) GetClusterName() string { } func TestK8sAPIServer_New(t *testing.T) { - k8sClientOption := func(k *K8sAPIServer) { - k.k8sClient = nil - } - k8sAPIServer, err := New(mockClusterNameProvider{}, zap.NewNop(), k8sClientOption) + k8sAPIServer, err := NewK8sAPIServer(mockClusterNameProvider{}, zap.NewNop(), nil) assert.Nil(t, k8sAPIServer) assert.NotNil(t, err) } @@ -200,26 +196,6 @@ func TestK8sAPIServer_New(t *testing.T) { func TestK8sAPIServer_GetMetrics(t *testing.T) { hostName, err := os.Hostname() assert.NoError(t, err) - k8sClientOption := func(k *K8sAPIServer) { - k.k8sClient = &mockK8sClient{} - } - leadingOption := func(k *K8sAPIServer) { - k.leading = true - } - broadcasterOption := func(k *K8sAPIServer) { - k.broadcaster = &mockEventBroadcaster{} - } - isLeadingCOption := func(k *K8sAPIServer) { - k.isLeadingC = make(chan bool) - } - - t.Setenv("HOST_NAME", hostName) - t.Setenv("K8S_NAMESPACE", "namespace") - k8sAPIServer, err := New(mockClusterNameProvider{}, zap.NewNop(), k8sClientOption, - leadingOption, broadcasterOption, isLeadingCOption) - - assert.NotNil(t, k8sAPIServer) - assert.Nil(t, err) mockClient.On("NamespaceToRunningPodNum").Return(map[string]int{"default": 2}) mockClient.On("ClusterFailedNodeCount").Return(1) @@ -257,7 +233,25 @@ func TestK8sAPIServer_GetMetrics(t *testing.T) { }, }) - <-k8sAPIServer.isLeadingC + leaderElection := &LeaderElection{ + k8sClient: &mockK8sClient{}, + nodeClient: mockClient, + epClient: mockClient, + podClient: mockClient, + deploymentClient: mockClient, + daemonSetClient: mockClient, + leading: true, + broadcaster: &mockEventBroadcaster{}, + isLeadingC: make(chan bool), + } + + t.Setenv("HOST_NAME", hostName) + //t.Setenv("K8S_NAMESPACE", "namespace") + k8sAPIServer, err := NewK8sAPIServer(mockClusterNameProvider{}, zap.NewNop(), leaderElection) + + assert.NotNil(t, k8sAPIServer) + assert.Nil(t, err) + metrics := k8sAPIServer.GetMetrics() assert.NoError(t, err) @@ -306,17 +300,3 @@ func TestK8sAPIServer_GetMetrics(t *testing.T) { k8sAPIServer.Shutdown() } - -func TestK8sAPIServer_init(t *testing.T) { - k8sAPIServer := &K8sAPIServer{} - - err := k8sAPIServer.init() - assert.NotNil(t, err) - assert.True(t, strings.HasPrefix(err.Error(), "environment variable HOST_NAME is not set")) - - t.Setenv("HOST_NAME", "hostname") - - err = k8sAPIServer.init() - assert.NotNil(t, err) - assert.True(t, strings.HasPrefix(err.Error(), "environment variable K8S_NAMESPACE is not set")) -} diff --git a/receiver/awscontainerinsightreceiver/internal/k8sapiserver/leaderelection.go b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/leaderelection.go new file mode 100644 index 000000000000..7d9ad3da2a0f --- /dev/null +++ b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/leaderelection.go @@ -0,0 +1,253 @@ +package k8sapiserver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/k8sapiserver" + +import ( + "context" + "errors" + "fmt" + "os" + "sync" + "time" + + "go.uber.org/zap" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/scheme" + corev1 "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/client-go/tools/leaderelection" + "k8s.io/client-go/tools/leaderelection/resourcelock" + "k8s.io/client-go/tools/record" + "k8s.io/klog" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/k8s/k8sclient" +) + +type LeaderElection struct { + logger *zap.Logger + cancel context.CancelFunc + nodeName string + + mu sync.Mutex + leading bool + leaderLockName string + leaderLockUsingConfigMapOnly bool + + k8sClient K8sClient // *k8sclient.K8sClient + epClient k8sclient.EpClient + nodeClient k8sclient.NodeClient + podClient k8sclient.PodClient + deploymentClient k8sclient.DeploymentClient + daemonSetClient k8sclient.DaemonSetClient + + // the following can be set to mocks in testing + broadcaster eventBroadcaster + // the close of isLeadingC indicates the leader election is done. This is used in testing + isLeadingC chan bool +} + +type eventBroadcaster interface { + // StartRecordingToSink starts sending events received from this EventBroadcaster to the given + // sink. The return value can be ignored or used to stop recording, if desired. + StartRecordingToSink(sink record.EventSink) watch.Interface + // StartLogging starts sending events received from this EventBroadcaster to the given logging + // function. The return value can be ignored or used to stop recording, if desired. + StartLogging(logf func(format string, args ...interface{})) watch.Interface + // NewRecorder returns an EventRecorder that can be used to send events to this EventBroadcaster + // with the event source set to the given event source. + NewRecorder(scheme *runtime.Scheme, source v1.EventSource) record.EventRecorder +} + +type K8sClient interface { + GetClientSet() kubernetes.Interface + GetEpClient() k8sclient.EpClient + GetNodeClient() k8sclient.NodeClient + GetPodClient() k8sclient.PodClient + GetDeploymentClient() k8sclient.DeploymentClient + GetDaemonSetClient() k8sclient.DaemonSetClient + ShutdownNodeClient() + ShutdownPodClient() +} + +type LeaderElectionOption func(*LeaderElection) + +func WithLeaderLockName(name string) LeaderElectionOption { + return func(le *LeaderElection) { + le.leaderLockName = name + } +} + +func WithLeaderLockUsingConfigMapOnly(leaderLockUsingConfigMapOnly bool) LeaderElectionOption { + return func(le *LeaderElection) { + le.leaderLockUsingConfigMapOnly = leaderLockUsingConfigMapOnly + } +} + +func NewLeaderElection(logger *zap.Logger, options ...LeaderElectionOption) (*LeaderElection, error) { + le := &LeaderElection{ + logger: logger, + k8sClient: k8sclient.Get(logger), + broadcaster: record.NewBroadcaster(), + } + + for _, opt := range options { + opt(le) + } + + if le.k8sClient == nil { + return nil, errors.New("failed to perform leaderelection because k8sclient is nil") + } + + if err := le.init(); err != nil { + return nil, err + } + + return le, nil +} + +func (le *LeaderElection) init() error { + var ctx context.Context + ctx, le.cancel = context.WithCancel(context.Background()) + + le.nodeName = os.Getenv("HOST_NAME") + if le.nodeName == "" { + return errors.New("environment variable HOST_NAME is not set in k8s deployment config") + } + + lockNamespace := os.Getenv("K8S_NAMESPACE") + if lockNamespace == "" { + return errors.New("environment variable K8S_NAMESPACE is not set in k8s deployment config") + } + + resourceLockConfig := resourcelock.ResourceLockConfig{ + Identity: le.nodeName, + EventRecorder: le.createRecorder(le.leaderLockName, lockNamespace), + } + + clientSet := le.k8sClient.GetClientSet() + configMapInterface := clientSet.CoreV1().ConfigMaps(lockNamespace) + configMap, err := configMapInterface.Get(ctx, le.leaderLockName, metav1.GetOptions{}) + if configMap == nil || err != nil { + le.logger.Info(fmt.Sprintf("Cannot get the leader config map: %v, try to create the config map...", err)) + configMap, err = configMapInterface.Create(ctx, + &v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: lockNamespace, + Name: le.leaderLockName, + }, + }, metav1.CreateOptions{}) + le.logger.Info(fmt.Sprintf("configMap: %v, err: %v", configMap, err)) + } + + le.logger.Info(fmt.Sprintf("configMap: %v, err: %v", configMap, err)) + + var lock resourcelock.Interface + if le.leaderLockUsingConfigMapOnly { + lock = &ConfigMapLock{ + ConfigMapMeta: metav1.ObjectMeta{ + Namespace: lockNamespace, + Name: le.leaderLockName, + }, + Client: clientSet.CoreV1(), + LockConfig: resourceLockConfig, + } + } else { + l, err := resourcelock.New( + resourcelock.ConfigMapsLeasesResourceLock, + lockNamespace, le.leaderLockName, + clientSet.CoreV1(), + clientSet.CoordinationV1(), + resourceLockConfig) + if err != nil { + le.logger.Warn("Failed to create resource lock", zap.Error(err)) + return err + } + lock = l + } + + go le.startLeaderElection(ctx, lock) + + return nil +} + +func (le *LeaderElection) startLeaderElection(ctx context.Context, lock resourcelock.Interface) { + + for { + leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{ + Lock: lock, + // IMPORTANT: you MUST ensure that any code you have that + // is protected by the lease must terminate **before** + // you call cancel. Otherwise, you could have a background + // loop still running and another process could + // get elected before your background loop finished, violating + // the stated goal of the lease. + LeaseDuration: 60 * time.Second, + RenewDeadline: 15 * time.Second, + RetryPeriod: 5 * time.Second, + Callbacks: leaderelection.LeaderCallbacks{ + OnStartedLeading: func(ctx context.Context) { + le.logger.Info(fmt.Sprintf("OnStartedLeading: %s", le.nodeName)) + // we're notified when we start + le.mu.Lock() + le.leading = true + // always retrieve clients in case previous ones shut down during leader switching + le.nodeClient = le.k8sClient.GetNodeClient() + le.podClient = le.k8sClient.GetPodClient() + le.epClient = le.k8sClient.GetEpClient() + le.deploymentClient = le.k8sClient.GetDeploymentClient() + le.daemonSetClient = le.k8sClient.GetDaemonSetClient() + le.mu.Unlock() + + if le.isLeadingC != nil { + // this executes only in testing + close(le.isLeadingC) + } + + for { + le.mu.Lock() + leading := le.leading + le.mu.Unlock() + if !leading { + le.logger.Info("no longer leading") + return + } + select { + case <-ctx.Done(): + le.logger.Info("ctx cancelled") + return + case <-time.After(time.Second): + } + } + }, + OnStoppedLeading: func() { + le.logger.Info(fmt.Sprintf("OnStoppedLeading: %s", le.nodeName)) + // we can do cleanup here, or after the RunOrDie method returns + le.mu.Lock() + defer le.mu.Unlock() + le.leading = false + // node and pod are only used for cluster level metrics, endpoint is used for decorator too. + le.k8sClient.ShutdownNodeClient() + le.k8sClient.ShutdownPodClient() + }, + OnNewLeader: func(identity string) { + le.logger.Info(fmt.Sprintf("Switch NewLeaderElection Leader: %s", identity)) + }, + }, + }) + + select { + case <-ctx.Done(): // when leader election ends, the channel ctx.Done() will be closed + le.logger.Info(fmt.Sprintf("shutdown Leader Election: %s", le.nodeName)) + return + default: + } + } +} + +func (le *LeaderElection) createRecorder(name, namespace string) record.EventRecorder { + le.broadcaster.StartLogging(klog.Infof) + clientSet := le.k8sClient.GetClientSet() + le.broadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: corev1.New(clientSet.CoreV1().RESTClient()).Events(namespace)}) + return le.broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: name}) +} diff --git a/receiver/awscontainerinsightreceiver/internal/k8sapiserver/leaderelection_test.go b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/leaderelection_test.go new file mode 100644 index 000000000000..0b576e431f1d --- /dev/null +++ b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/leaderelection_test.go @@ -0,0 +1,84 @@ +package k8sapiserver + +import ( + "os" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/k8s/k8sclient" +) + +func TestNewLeaderElectionOpts(t *testing.T) { + t.Setenv("HOST_NAME", "hostname") + t.Setenv("K8S_NAMESPACE", "namespace") + + le, err := NewLeaderElection(zap.NewNop(), WithLeaderLockName("test"), WithLeaderLockUsingConfigMapOnly(true)) + assert.NotNil(t, le) + assert.NoError(t, err) + assert.Equal(t, "test", le.leaderLockName) + assert.True(t, le.leaderLockUsingConfigMapOnly) + +} +func TestLeaderElectionInitErrors(t *testing.T) { + le, err := NewLeaderElection(zap.NewNop()) + assert.Error(t, err) + assert.True(t, strings.HasPrefix(err.Error(), "environment variable HOST_NAME is not set")) + assert.Nil(t, le) + + t.Setenv("HOST_NAME", "hostname") + + le, err = NewLeaderElection(zap.NewNop()) + assert.Error(t, err) + assert.True(t, strings.HasPrefix(err.Error(), "environment variable K8S_NAMESPACE is not set")) + assert.Nil(t, le) +} + +func TestLeaderElectionEndToEnd(t *testing.T) { + hostName, err := os.Hostname() + assert.NoError(t, err) + k8sclientOption := func(le *LeaderElection) { + le.k8sClient = &mockK8sClient{} + } + leadingOption := func(le *LeaderElection) { + le.leading = true + } + broadcasterOption := func(le *LeaderElection) { + le.broadcaster = &mockEventBroadcaster{} + } + isLeadingCOption := func(le *LeaderElection) { + le.isLeadingC = make(chan bool) + } + + t.Setenv("HOST_NAME", hostName) + t.Setenv("K8S_NAMESPACE", "namespace") + leaderElection, err := NewLeaderElection(zap.NewNop(), k8sclientOption, + leadingOption, broadcasterOption, isLeadingCOption) + + assert.NotNil(t, leaderElection) + assert.NoError(t, err) + + mockClient.On("NamespaceToRunningPodNum").Return(map[string]int{"default": 2}) + mockClient.On("ClusterFailedNodeCount").Return(1) + mockClient.On("ClusterNodeCount").Return(1) + mockClient.On("ServiceToPodNum").Return( + map[k8sclient.Service]int{ + NewService("service1", "kube-system"): 1, + NewService("service2", "kube-system"): 1, + }, + ) + + <-leaderElection.isLeadingC + assert.True(t, leaderElection.leading) + + // shut down + leaderElection.cancel() + + assert.Eventually(t, func() bool { + return !leaderElection.leading + }, 2*time.Second, 5*time.Millisecond) + +} diff --git a/receiver/awscontainerinsightreceiver/internal/k8sapiserver/prometheus_scraper.go b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/prometheus_scraper.go index 2b7e7b6a7f4f..7e8595443037 100644 --- a/receiver/awscontainerinsightreceiver/internal/k8sapiserver/prometheus_scraper.go +++ b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/prometheus_scraper.go @@ -16,7 +16,6 @@ package k8sapiserver // import "github.com/open-telemetry/opentelemetry-collecto import ( "context" - "errors" "fmt" "os" "time" @@ -25,9 +24,10 @@ import ( "go.opentelemetry.io/collector/config/confighttp" "go.opentelemetry.io/collector/config/configtls" "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/receiver" + "go.uber.org/zap" - "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/k8s/k8sclient" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/simpleprometheusreceiver" ) @@ -42,19 +42,11 @@ type PrometheusScraper struct { host component.Host clusterNameProvider clusterNameProvider simplePrometheusReceiver receiver.Metrics + leaderElection *LeaderElection + running bool } -func NewPrometheusScraper(ctx context.Context, telemetrySettings component.TelemetrySettings, nextConsumer consumer.Metrics, host component.Host, clusterNameProvider clusterNameProvider) (*PrometheusScraper, error) { - // TODO: need leader election - - k8sClient := k8sclient.Get(telemetrySettings.Logger) - if k8sClient == nil { - return nil, errors.New("failed to start k8sapiserver because k8sclient is nil") - } - - // get endpoint - endpoint := k8sClient.GetClientSet().CoreV1().RESTClient().Get().AbsPath("/").URL().Hostname() - +func NewPrometheusScraper(ctx context.Context, telemetrySettings component.TelemetrySettings, endpoint string, nextConsumer consumer.Metrics, host component.Host, clusterNameProvider clusterNameProvider, leaderElection *LeaderElection) (*PrometheusScraper, error) { spConfig := simpleprometheusreceiver.Config{ HTTPClientSettings: confighttp.HTTPClientSettings{ Endpoint: endpoint, @@ -89,13 +81,34 @@ func NewPrometheusScraper(ctx context.Context, telemetrySettings component.Telem host: host, clusterNameProvider: clusterNameProvider, simplePrometheusReceiver: spr, + leaderElection: leaderElection, }, nil } -func (ps *PrometheusScraper) Start() error { - return ps.simplePrometheusReceiver.Start(ps.ctx, ps.host) -} +func (ps *PrometheusScraper) GetMetrics() []pmetric.Metrics { + // This method will never return metrics because the metrics are collected by the scraper. + // This method will ensure the scraper is running + if !ps.leaderElection.leading { + return nil + } -func (ps *PrometheusScraper) Shutdown() error { - return ps.simplePrometheusReceiver.Shutdown(ps.ctx) + // if we are leading, ensure we are running + if !ps.running { + ps.settings.Logger.Info("The scraper is not running, starting up the scraper") + err := ps.simplePrometheusReceiver.Start(ps.ctx, ps.host) + if err != nil { + ps.settings.Logger.Error("Unable to start SimplePrometheusReceiver", zap.Error(err)) + } + ps.running = err == nil + } + return nil +} +func (ps *PrometheusScraper) Shutdown() { + if ps.running { + err := ps.simplePrometheusReceiver.Shutdown(ps.ctx) + if err != nil { + ps.settings.Logger.Error("Unable to shutdown SimplePrometheusReceiver", zap.Error(err)) + } + ps.running = err != nil + } } diff --git a/receiver/awscontainerinsightreceiver/internal/k8sapiserver/prometheus_scraper_test.go b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/prometheus_scraper_test.go index bc729e499335..cb850a657a34 100644 --- a/receiver/awscontainerinsightreceiver/internal/k8sapiserver/prometheus_scraper_test.go +++ b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/prometheus_scraper_test.go @@ -93,7 +93,10 @@ func TestNewPrometheusScraperEndToEnd(t *testing.T) { settings := componenttest.NewNopTelemetrySettings() settings.Logger, _ = zap.NewDevelopment() - scraper, err := NewPrometheusScraper(context.TODO(), settings, consumer, componenttest.NewNopHost(), mockClusterNameProvider{}) + leaderElection := LeaderElection{ + leading: true, + } + scraper, err := NewPrometheusScraper(context.TODO(), settings, "", consumer, componenttest.NewNopHost(), mockClusterNameProvider{}, &leaderElection) assert.NoError(t, err) assert.Equal(t, mockClusterNameProvider{}, scraper.clusterNameProvider) @@ -135,13 +138,17 @@ func TestNewPrometheusScraperEndToEnd(t *testing.T) { assert.NotNil(t, mp) defer mp.Close() - assert.NoError(t, scraper.Start()) + // perform an async scrape, this will kick off the scraper process + go func() { + scraper.GetMetrics() + }() t.Cleanup(func() { - assert.NoError(t, scraper.Shutdown()) + scraper.Shutdown() }) - // wait for scrape + // wait for 2 scrapes, one initiated by us, another by the new scraper process + mp.wg.Wait() mp.wg.Wait() assert.True(t, *consumer.up) diff --git a/receiver/awscontainerinsightreceiver/internal/leaderelection/configmaplock.go b/receiver/awscontainerinsightreceiver/internal/leaderelection/configmaplock.go new file mode 100644 index 000000000000..c8a106dbf2fd --- /dev/null +++ b/receiver/awscontainerinsightreceiver/internal/leaderelection/configmaplock.go @@ -0,0 +1,106 @@ +package leaderelection // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/leaderelection" + +import ( + "context" + "encoding/json" + "errors" + "fmt" + + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + corev1client "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/client-go/tools/leaderelection/resourcelock" +) + +type ConfigMapLock struct { + // ConfigMapMeta should contain a Name and a Namespace of a + // ConfigMapMeta object that the LeaderElector will attempt to lead. + ConfigMapMeta metav1.ObjectMeta + Client corev1client.ConfigMapsGetter + LockConfig resourcelock.ResourceLockConfig + cm *v1.ConfigMap +} + +// Get returns the election record from a ConfigMap Annotation +func (cml *ConfigMapLock) Get(ctx context.Context) (*resourcelock.LeaderElectionRecord, []byte, error) { + var record resourcelock.LeaderElectionRecord + var err error + cml.cm, err = cml.Client.ConfigMaps(cml.ConfigMapMeta.Namespace).Get(ctx, cml.ConfigMapMeta.Name, metav1.GetOptions{}) + if err != nil { + return nil, nil, err + } + if cml.cm.Annotations == nil { + cml.cm.Annotations = make(map[string]string) + } + recordStr, found := cml.cm.Annotations[resourcelock.LeaderElectionRecordAnnotationKey] + recordBytes := []byte(recordStr) + if found { + if err := json.Unmarshal(recordBytes, &record); err != nil { + return nil, nil, err + } + } + return &record, recordBytes, nil +} + +// Create attempts to create a LeaderElectionRecord annotation +func (cml *ConfigMapLock) Create(ctx context.Context, ler resourcelock.LeaderElectionRecord) error { + recordBytes, err := json.Marshal(ler) + if err != nil { + return err + } + cml.cm, err = cml.Client.ConfigMaps(cml.ConfigMapMeta.Namespace).Create(ctx, &v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: cml.ConfigMapMeta.Name, + Namespace: cml.ConfigMapMeta.Namespace, + Annotations: map[string]string{ + resourcelock.LeaderElectionRecordAnnotationKey: string(recordBytes), + }, + }, + }, metav1.CreateOptions{}) + return err +} + +// Update will update an existing annotation on a given resource. +func (cml *ConfigMapLock) Update(ctx context.Context, ler resourcelock.LeaderElectionRecord) error { + if cml.cm == nil { + return errors.New("configmap not initialized, call get or create first") + } + recordBytes, err := json.Marshal(ler) + if err != nil { + return err + } + if cml.cm.Annotations == nil { + cml.cm.Annotations = make(map[string]string) + } + cml.cm.Annotations[resourcelock.LeaderElectionRecordAnnotationKey] = string(recordBytes) + cm, err := cml.Client.ConfigMaps(cml.ConfigMapMeta.Namespace).Update(ctx, cml.cm, metav1.UpdateOptions{}) + if err != nil { + return err + } + cml.cm = cm + return nil +} + +// RecordEvent in leader election while adding meta-data +func (cml *ConfigMapLock) RecordEvent(s string) { + if cml.LockConfig.EventRecorder == nil { + return + } + events := fmt.Sprintf("%v %v", cml.LockConfig.Identity, s) + subject := &v1.ConfigMap{ObjectMeta: cml.cm.ObjectMeta} + // Populate the type meta, so we don't have to get it from the schema + subject.Kind = "ConfigMap" + subject.APIVersion = v1.SchemeGroupVersion.String() + cml.LockConfig.EventRecorder.Eventf(subject, v1.EventTypeNormal, "LeaderElection", events) +} + +// Describe is used to convert details on current resource lock +// into a string +func (cml *ConfigMapLock) Describe() string { + return fmt.Sprintf("%v/%v", cml.ConfigMapMeta.Namespace, cml.ConfigMapMeta.Name) +} + +// Identity returns the Identity of the lock +func (cml *ConfigMapLock) Identity() string { + return cml.LockConfig.Identity +} diff --git a/receiver/awscontainerinsightreceiver/receiver.go b/receiver/awscontainerinsightreceiver/receiver.go index d660d5bdab92..0fc53f25688f 100644 --- a/receiver/awscontainerinsightreceiver/receiver.go +++ b/receiver/awscontainerinsightreceiver/receiver.go @@ -26,6 +26,7 @@ import ( "go.uber.org/zap" ci "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/containerinsight" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/k8s/k8sclient" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/cadvisor" ecsinfo "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/ecsInfo" hostInfo "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/host" @@ -87,19 +88,29 @@ func (acir *awsContainerInsightReceiver) Start(ctx context.Context, host compone if err != nil { return err } - acir.k8sapiserver, err = k8sapiserver.New(hostinfo, acir.settings.Logger, k8sapiserver.WithLeaderLockName(acir.config.LeaderLockName), + + leaderElection, err := k8sapiserver.NewLeaderElection(acir.settings.Logger, k8sapiserver.WithLeaderLockName(acir.config.LeaderLockName), k8sapiserver.WithLeaderLockUsingConfigMapOnly(acir.config.LeaderLockUsingConfigMapOnly)) if err != nil { return err } - /* TODO: enable this via config - acir.prometheusScraper, err = k8sapiserver.NewPrometheusScraper(ctx, acir.settings, acir.nextConsumer, host, hostinfo) + acir.k8sapiserver, err = k8sapiserver.NewK8sAPIServer(hostinfo, acir.settings.Logger, leaderElection) if err != nil { - acir.settings.Logger.Error("Unable to start the prometheus scraper", zap.Error(err)) + return err } - acir.prometheusScraper.Start() - */ + + // TODO: enable this via config + /*endpoint, err := acir.getK8sApiServerEndpoint() + if err != nil { + acir.settings.Logger.Error("Unable to start the prometheus scraper", zap.Error(err)) + } else { + // use the same leader + acir.prometheusScraper, err = k8sapiserver.NewPrometheusScraper(ctx, acir.settings, endpoint, acir.nextConsumer, host, hostinfo, leaderElection) + if err != nil { + acir.settings.Logger.Error("Unable to start the prometheus scraper", zap.Error(err)) + } + }*/ } if acir.config.ContainerOrchestrator == ci.ECS { @@ -170,6 +181,11 @@ func (acir *awsContainerInsightReceiver) collectData(ctx context.Context) error mds = append(mds, acir.k8sapiserver.GetMetrics()...) } + if acir.prometheusScraper != nil { + // this does not return any metrics, it just indirectly ensures scraping is running on a leader + acir.prometheusScraper.GetMetrics() //nolint:errcheck + } + for _, md := range mds { err := acir.nextConsumer.ConsumeMetrics(ctx, md) if err != nil { @@ -179,3 +195,13 @@ func (acir *awsContainerInsightReceiver) collectData(ctx context.Context) error return nil } + +func (acir *awsContainerInsightReceiver) getK8sApiServerEndpoint() (string, error) { + k8sClient := k8sclient.Get(acir.settings.Logger) + if k8sClient == nil { + return "", errors.New("cannot start k8s client, unable to find K8sApiServer endpoint") + } + endpoint := k8sClient.GetClientSet().CoreV1().RESTClient().Get().AbsPath("/").URL().Hostname() + + return endpoint, nil +} From ee7f1109298ddabd4a75b81723eb1725afeec396 Mon Sep 17 00:00:00 2001 From: Chad Patel Date: Tue, 30 May 2023 15:37:54 -0500 Subject: [PATCH 05/17] remove comment --- .../internal/k8sapiserver/k8sapiserver_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/receiver/awscontainerinsightreceiver/internal/k8sapiserver/k8sapiserver_test.go b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/k8sapiserver_test.go index 847aa1ec6bcd..89fbdbce3a04 100644 --- a/receiver/awscontainerinsightreceiver/internal/k8sapiserver/k8sapiserver_test.go +++ b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/k8sapiserver_test.go @@ -246,7 +246,7 @@ func TestK8sAPIServer_GetMetrics(t *testing.T) { } t.Setenv("HOST_NAME", hostName) - //t.Setenv("K8S_NAMESPACE", "namespace") + t.Setenv("K8S_NAMESPACE", "namespace") k8sAPIServer, err := NewK8sAPIServer(mockClusterNameProvider{}, zap.NewNop(), leaderElection) assert.NotNil(t, k8sAPIServer) From 7b6ce674ff492992d9ded78e3fdea5087059672d Mon Sep 17 00:00:00 2001 From: Chad Patel Date: Tue, 30 May 2023 16:22:05 -0500 Subject: [PATCH 06/17] add documentation, add license --- .../awscontainerinsightreceiver/README.md | 35 ++++++++++++++++--- .../internal/k8sapiserver/leaderelection.go | 14 ++++++++ .../k8sapiserver/leaderelection_test.go | 14 ++++++++ 3 files changed, 59 insertions(+), 4 deletions(-) diff --git a/receiver/awscontainerinsightreceiver/README.md b/receiver/awscontainerinsightreceiver/README.md index 8e91b0226e0d..fb0675792d0b 100644 --- a/receiver/awscontainerinsightreceiver/README.md +++ b/receiver/awscontainerinsightreceiver/README.md @@ -340,10 +340,19 @@ kubectl apply -f config.yaml ## Available Metrics and Resource Attributes ### Cluster -| Metric | Unit | -|---------------------------|-------| -| cluster_failed_node_count | Count | -| cluster_node_count | Count | +| Metric | Unit | +|------------------------------------------------------------|---------| +| cluster_failed_node_count | Count | +| cluster_node_count | Count | +| apiserver_storage_objects | Count | +| apiserver_request_total | Count | +| apiserver_request_duration_seconds | Seconds | +| apiserver_admission_controller_admission_duration_seconds | Seconds | +| rest_client_request_duration_seconds | Seconds | +| rest_client_requests_total | Count | +| etcd_request_duration_seconds | Seconds | + +

| Resource Attribute | @@ -399,6 +408,24 @@ kubectl apply -f config.yaml

+ +### Cluster Endpoint +| Metric | Unit | +|-----------------------------|-------| +| etcd_db_total_size_in_bytes | bytes | + + +

+| Resource Attribute | +|--------------------| +| ClusterName | +| NodeName | +| Endpoint | +| Type | +| Timestamp | +| Version | +| Sources | +

### Cluster Deployment diff --git a/receiver/awscontainerinsightreceiver/internal/k8sapiserver/leaderelection.go b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/leaderelection.go index 7d9ad3da2a0f..52dcc95e9226 100644 --- a/receiver/awscontainerinsightreceiver/internal/k8sapiserver/leaderelection.go +++ b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/leaderelection.go @@ -1,3 +1,17 @@ +// Copyright OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package k8sapiserver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/k8sapiserver" import ( diff --git a/receiver/awscontainerinsightreceiver/internal/k8sapiserver/leaderelection_test.go b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/leaderelection_test.go index 0b576e431f1d..5dccc2b172bf 100644 --- a/receiver/awscontainerinsightreceiver/internal/k8sapiserver/leaderelection_test.go +++ b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/leaderelection_test.go @@ -1,3 +1,17 @@ +// Copyright OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package k8sapiserver import ( From 4fec362a90257692dc5a877a2464837d70eceeba Mon Sep 17 00:00:00 2001 From: Chad Patel Date: Tue, 30 May 2023 16:41:29 -0500 Subject: [PATCH 07/17] remove unused file --- .../internal/leaderelection/configmaplock.go | 106 ------------------ 1 file changed, 106 deletions(-) delete mode 100644 receiver/awscontainerinsightreceiver/internal/leaderelection/configmaplock.go diff --git a/receiver/awscontainerinsightreceiver/internal/leaderelection/configmaplock.go b/receiver/awscontainerinsightreceiver/internal/leaderelection/configmaplock.go deleted file mode 100644 index c8a106dbf2fd..000000000000 --- a/receiver/awscontainerinsightreceiver/internal/leaderelection/configmaplock.go +++ /dev/null @@ -1,106 +0,0 @@ -package leaderelection // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/leaderelection" - -import ( - "context" - "encoding/json" - "errors" - "fmt" - - v1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - corev1client "k8s.io/client-go/kubernetes/typed/core/v1" - "k8s.io/client-go/tools/leaderelection/resourcelock" -) - -type ConfigMapLock struct { - // ConfigMapMeta should contain a Name and a Namespace of a - // ConfigMapMeta object that the LeaderElector will attempt to lead. - ConfigMapMeta metav1.ObjectMeta - Client corev1client.ConfigMapsGetter - LockConfig resourcelock.ResourceLockConfig - cm *v1.ConfigMap -} - -// Get returns the election record from a ConfigMap Annotation -func (cml *ConfigMapLock) Get(ctx context.Context) (*resourcelock.LeaderElectionRecord, []byte, error) { - var record resourcelock.LeaderElectionRecord - var err error - cml.cm, err = cml.Client.ConfigMaps(cml.ConfigMapMeta.Namespace).Get(ctx, cml.ConfigMapMeta.Name, metav1.GetOptions{}) - if err != nil { - return nil, nil, err - } - if cml.cm.Annotations == nil { - cml.cm.Annotations = make(map[string]string) - } - recordStr, found := cml.cm.Annotations[resourcelock.LeaderElectionRecordAnnotationKey] - recordBytes := []byte(recordStr) - if found { - if err := json.Unmarshal(recordBytes, &record); err != nil { - return nil, nil, err - } - } - return &record, recordBytes, nil -} - -// Create attempts to create a LeaderElectionRecord annotation -func (cml *ConfigMapLock) Create(ctx context.Context, ler resourcelock.LeaderElectionRecord) error { - recordBytes, err := json.Marshal(ler) - if err != nil { - return err - } - cml.cm, err = cml.Client.ConfigMaps(cml.ConfigMapMeta.Namespace).Create(ctx, &v1.ConfigMap{ - ObjectMeta: metav1.ObjectMeta{ - Name: cml.ConfigMapMeta.Name, - Namespace: cml.ConfigMapMeta.Namespace, - Annotations: map[string]string{ - resourcelock.LeaderElectionRecordAnnotationKey: string(recordBytes), - }, - }, - }, metav1.CreateOptions{}) - return err -} - -// Update will update an existing annotation on a given resource. -func (cml *ConfigMapLock) Update(ctx context.Context, ler resourcelock.LeaderElectionRecord) error { - if cml.cm == nil { - return errors.New("configmap not initialized, call get or create first") - } - recordBytes, err := json.Marshal(ler) - if err != nil { - return err - } - if cml.cm.Annotations == nil { - cml.cm.Annotations = make(map[string]string) - } - cml.cm.Annotations[resourcelock.LeaderElectionRecordAnnotationKey] = string(recordBytes) - cm, err := cml.Client.ConfigMaps(cml.ConfigMapMeta.Namespace).Update(ctx, cml.cm, metav1.UpdateOptions{}) - if err != nil { - return err - } - cml.cm = cm - return nil -} - -// RecordEvent in leader election while adding meta-data -func (cml *ConfigMapLock) RecordEvent(s string) { - if cml.LockConfig.EventRecorder == nil { - return - } - events := fmt.Sprintf("%v %v", cml.LockConfig.Identity, s) - subject := &v1.ConfigMap{ObjectMeta: cml.cm.ObjectMeta} - // Populate the type meta, so we don't have to get it from the schema - subject.Kind = "ConfigMap" - subject.APIVersion = v1.SchemeGroupVersion.String() - cml.LockConfig.EventRecorder.Eventf(subject, v1.EventTypeNormal, "LeaderElection", events) -} - -// Describe is used to convert details on current resource lock -// into a string -func (cml *ConfigMapLock) Describe() string { - return fmt.Sprintf("%v/%v", cml.ConfigMapMeta.Namespace, cml.ConfigMapMeta.Name) -} - -// Identity returns the Identity of the lock -func (cml *ConfigMapLock) Identity() string { - return cml.LockConfig.Identity -} From cb5a3bb1083d1dd347d79c0cd61e8e47835eed72 Mon Sep 17 00:00:00 2001 From: Chad Patel Date: Tue, 30 May 2023 16:47:03 -0500 Subject: [PATCH 08/17] fix linter errors --- receiver/awscontainerinsightreceiver/receiver.go | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/receiver/awscontainerinsightreceiver/receiver.go b/receiver/awscontainerinsightreceiver/receiver.go index 0fc53f25688f..38cdea4c1ef0 100644 --- a/receiver/awscontainerinsightreceiver/receiver.go +++ b/receiver/awscontainerinsightreceiver/receiver.go @@ -101,16 +101,17 @@ func (acir *awsContainerInsightReceiver) Start(ctx context.Context, host compone } // TODO: enable this via config - /*endpoint, err := acir.getK8sApiServerEndpoint() + endpoint, err := acir.getK8sAPIServerEndpoint() if err != nil { acir.settings.Logger.Error("Unable to start the prometheus scraper", zap.Error(err)) } else { + acir.settings.Logger.Debug("k8sapiserver endpoint found", zap.String("endpoint", endpoint)) // use the same leader - acir.prometheusScraper, err = k8sapiserver.NewPrometheusScraper(ctx, acir.settings, endpoint, acir.nextConsumer, host, hostinfo, leaderElection) + /*acir.prometheusScraper, err = k8sapiserver.NewPrometheusScraper(ctx, acir.settings, endpoint, acir.nextConsumer, host, hostinfo, leaderElection) if err != nil { acir.settings.Logger.Error("Unable to start the prometheus scraper", zap.Error(err)) - } - }*/ + }*/ + } } if acir.config.ContainerOrchestrator == ci.ECS { @@ -196,7 +197,7 @@ func (acir *awsContainerInsightReceiver) collectData(ctx context.Context) error return nil } -func (acir *awsContainerInsightReceiver) getK8sApiServerEndpoint() (string, error) { +func (acir *awsContainerInsightReceiver) getK8sAPIServerEndpoint() (string, error) { k8sClient := k8sclient.Get(acir.settings.Logger) if k8sClient == nil { return "", errors.New("cannot start k8s client, unable to find K8sApiServer endpoint") From 8114a9507aed6f13c0c1afbd304b7b42887125ca Mon Sep 17 00:00:00 2001 From: Chad Patel Date: Tue, 30 May 2023 18:49:16 -0500 Subject: [PATCH 09/17] attempt to fix unit tests --- .../internal/k8sapiserver/leaderelection_test.go | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/receiver/awscontainerinsightreceiver/internal/k8sapiserver/leaderelection_test.go b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/leaderelection_test.go index 5dccc2b172bf..ae5fd1036381 100644 --- a/receiver/awscontainerinsightreceiver/internal/k8sapiserver/leaderelection_test.go +++ b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/leaderelection_test.go @@ -26,11 +26,17 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/k8s/k8sclient" ) +func WithClient(client K8sClient) LeaderElectionOption { + return func(le *LeaderElection) { + le.k8sClient = client + } +} + func TestNewLeaderElectionOpts(t *testing.T) { t.Setenv("HOST_NAME", "hostname") t.Setenv("K8S_NAMESPACE", "namespace") - le, err := NewLeaderElection(zap.NewNop(), WithLeaderLockName("test"), WithLeaderLockUsingConfigMapOnly(true)) + le, err := NewLeaderElection(zap.NewNop(), WithClient(&mockK8sClient{}), WithLeaderLockName("test"), WithLeaderLockUsingConfigMapOnly(true)) assert.NotNil(t, le) assert.NoError(t, err) assert.Equal(t, "test", le.leaderLockName) @@ -38,7 +44,7 @@ func TestNewLeaderElectionOpts(t *testing.T) { } func TestLeaderElectionInitErrors(t *testing.T) { - le, err := NewLeaderElection(zap.NewNop()) + le, err := NewLeaderElection(zap.NewNop(), WithClient(&mockK8sClient{})) assert.Error(t, err) assert.True(t, strings.HasPrefix(err.Error(), "environment variable HOST_NAME is not set")) assert.Nil(t, le) @@ -54,9 +60,6 @@ func TestLeaderElectionInitErrors(t *testing.T) { func TestLeaderElectionEndToEnd(t *testing.T) { hostName, err := os.Hostname() assert.NoError(t, err) - k8sclientOption := func(le *LeaderElection) { - le.k8sClient = &mockK8sClient{} - } leadingOption := func(le *LeaderElection) { le.leading = true } @@ -69,7 +72,7 @@ func TestLeaderElectionEndToEnd(t *testing.T) { t.Setenv("HOST_NAME", hostName) t.Setenv("K8S_NAMESPACE", "namespace") - leaderElection, err := NewLeaderElection(zap.NewNop(), k8sclientOption, + leaderElection, err := NewLeaderElection(zap.NewNop(), WithClient(&mockK8sClient{}), leadingOption, broadcasterOption, isLeadingCOption) assert.NotNil(t, leaderElection) From 6481187c06ca95e843986e9b6110c2e2a89cd215 Mon Sep 17 00:00:00 2001 From: Chad Patel Date: Tue, 30 May 2023 19:19:30 -0500 Subject: [PATCH 10/17] fix cleanup on the unit tests --- .../internal/k8sapiserver/leaderelection_test.go | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/receiver/awscontainerinsightreceiver/internal/k8sapiserver/leaderelection_test.go b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/leaderelection_test.go index ae5fd1036381..cceeee3b5c3f 100644 --- a/receiver/awscontainerinsightreceiver/internal/k8sapiserver/leaderelection_test.go +++ b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/leaderelection_test.go @@ -37,12 +37,16 @@ func TestNewLeaderElectionOpts(t *testing.T) { t.Setenv("K8S_NAMESPACE", "namespace") le, err := NewLeaderElection(zap.NewNop(), WithClient(&mockK8sClient{}), WithLeaderLockName("test"), WithLeaderLockUsingConfigMapOnly(true)) + t.Cleanup(func() { + le.cancel() + }) assert.NotNil(t, le) assert.NoError(t, err) assert.Equal(t, "test", le.leaderLockName) assert.True(t, le.leaderLockUsingConfigMapOnly) } + func TestLeaderElectionInitErrors(t *testing.T) { le, err := NewLeaderElection(zap.NewNop(), WithClient(&mockK8sClient{})) assert.Error(t, err) @@ -91,11 +95,11 @@ func TestLeaderElectionEndToEnd(t *testing.T) { <-leaderElection.isLeadingC assert.True(t, leaderElection.leading) - // shut down - leaderElection.cancel() - - assert.Eventually(t, func() bool { - return !leaderElection.leading - }, 2*time.Second, 5*time.Millisecond) + t.Cleanup(func() { + leaderElection.cancel() + assert.Eventually(t, func() bool { + return !leaderElection.leading + }, 2*time.Second, 5*time.Millisecond) + }) } From 949516d7ce5a307d1ff27bab63f1fbbb3e243103 Mon Sep 17 00:00:00 2001 From: Chad Patel Date: Tue, 30 May 2023 19:38:13 -0500 Subject: [PATCH 11/17] merge in the daemonset/deployment changes --- .../internal/k8sapiserver/k8sapiserver.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/receiver/awscontainerinsightreceiver/internal/k8sapiserver/k8sapiserver.go b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/k8sapiserver.go index 618caaa3016c..383947dc825b 100644 --- a/receiver/awscontainerinsightreceiver/internal/k8sapiserver/k8sapiserver.go +++ b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/k8sapiserver.go @@ -162,7 +162,7 @@ func (k *K8sAPIServer) getDeploymentMetrics(clusterName, timestampNs string) []p attributes[ci.NodeNameKey] = k.nodeName } attributes[ci.SourcesKey] = "[\"apiserver\"]" - //attributes[ci.Kubernetes] = fmt.Sprintf("{\"namespace_name\":\"%s\",\"deployment_name\":\"%s\"}", + // attributes[ci.Kubernetes] = fmt.Sprintf("{\"namespace_name\":\"%s\",\"deployment_name\":\"%s\"}", // deployment.Namespace, deployment.Name) md := ci.ConvertToOTLPMetrics(fields, attributes, k.logger) metrics = append(metrics, md) @@ -192,7 +192,7 @@ func (k *K8sAPIServer) getDaemonSetMetrics(clusterName, timestampNs string) []pm attributes[ci.NodeNameKey] = k.nodeName } attributes[ci.SourcesKey] = "[\"apiserver\"]" - //attributes[ci.Kubernetes] = fmt.Sprintf("{\"namespace_name\":\"%s\",\"daemonset_name\":\"%s\"}", + // attributes[ci.Kubernetes] = fmt.Sprintf("{\"namespace_name\":\"%s\",\"daemonset_name\":\"%s\"}", // daemonSet.Namespace, daemonSet.Name) md := ci.ConvertToOTLPMetrics(fields, attributes, k.logger) metrics = append(metrics, md) From 46f0c14e02dca2da3b7062e86da11b29236f25fc Mon Sep 17 00:00:00 2001 From: Chad Patel Date: Tue, 30 May 2023 20:08:49 -0500 Subject: [PATCH 12/17] fix linter errors and attempt to fix a data race --- internal/aws/k8s/k8sclient/daemonset_test.go | 3 ++- internal/aws/k8s/k8sclient/deployment_test.go | 3 ++- .../internal/k8sapiserver/prometheus_scraper_test.go | 6 ++---- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/internal/aws/k8s/k8sclient/daemonset_test.go b/internal/aws/k8s/k8sclient/daemonset_test.go index 99c91f94c07e..ea664d27a583 100644 --- a/internal/aws/k8s/k8sclient/daemonset_test.go +++ b/internal/aws/k8s/k8sclient/daemonset_test.go @@ -14,6 +14,8 @@ package k8sclient import ( + "testing" + "github.com/stretchr/testify/assert" "go.uber.org/zap" appsv1 "k8s.io/api/apps/v1" @@ -21,7 +23,6 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes/fake" - "testing" ) var daemonSetObjects = []runtime.Object{ diff --git a/internal/aws/k8s/k8sclient/deployment_test.go b/internal/aws/k8s/k8sclient/deployment_test.go index 72a2291a41a8..2de9d737b154 100644 --- a/internal/aws/k8s/k8sclient/deployment_test.go +++ b/internal/aws/k8s/k8sclient/deployment_test.go @@ -14,6 +14,8 @@ package k8sclient import ( + "testing" + "github.com/stretchr/testify/assert" "go.uber.org/zap" appsv1 "k8s.io/api/apps/v1" @@ -21,7 +23,6 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes/fake" - "testing" ) var desired = int32(20) diff --git a/receiver/awscontainerinsightreceiver/internal/k8sapiserver/prometheus_scraper_test.go b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/prometheus_scraper_test.go index cb850a657a34..934713234b31 100644 --- a/receiver/awscontainerinsightreceiver/internal/k8sapiserver/prometheus_scraper_test.go +++ b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/prometheus_scraper_test.go @@ -138,10 +138,8 @@ func TestNewPrometheusScraperEndToEnd(t *testing.T) { assert.NotNil(t, mp) defer mp.Close() - // perform an async scrape, this will kick off the scraper process - go func() { - scraper.GetMetrics() - }() + // perform a single scrape, this will kick off the scraper process for additional scrapes + scraper.GetMetrics() t.Cleanup(func() { scraper.Shutdown() From ad5ac6e2711e012ce600eb749e719b6e68a66989 Mon Sep 17 00:00:00 2001 From: Chad Patel Date: Tue, 30 May 2023 20:21:54 -0500 Subject: [PATCH 13/17] fix linter errors --- internal/aws/k8s/k8sclient/daemonset.go | 3 ++- internal/aws/k8s/k8sclient/deployment.go | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/internal/aws/k8s/k8sclient/daemonset.go b/internal/aws/k8s/k8sclient/daemonset.go index 06c79b0a04db..3dc98ed177e6 100644 --- a/internal/aws/k8s/k8sclient/daemonset.go +++ b/internal/aws/k8s/k8sclient/daemonset.go @@ -17,6 +17,8 @@ package k8sclient // import "github.com/open-telemetry/opentelemetry-collector-c import ( "context" "fmt" + "sync" + "go.uber.org/zap" appsv1 "k8s.io/api/apps/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -24,7 +26,6 @@ import ( "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" - "sync" ) type DaemonSetClient interface { diff --git a/internal/aws/k8s/k8sclient/deployment.go b/internal/aws/k8s/k8sclient/deployment.go index 967ea35cfa0c..9a2583d9cefa 100644 --- a/internal/aws/k8s/k8sclient/deployment.go +++ b/internal/aws/k8s/k8sclient/deployment.go @@ -17,6 +17,8 @@ package k8sclient // import "github.com/open-telemetry/opentelemetry-collector-c import ( "context" "fmt" + "sync" + "go.uber.org/zap" appsv1 "k8s.io/api/apps/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -24,7 +26,6 @@ import ( "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" - "sync" ) type DeploymentClient interface { From 8b64dce33dc8a882e626ef5b9c48243798a35b87 Mon Sep 17 00:00:00 2001 From: Chad Patel Date: Tue, 30 May 2023 20:35:07 -0500 Subject: [PATCH 14/17] attempt to fix data race --- .../internal/k8sapiserver/leaderelection_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/receiver/awscontainerinsightreceiver/internal/k8sapiserver/leaderelection_test.go b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/leaderelection_test.go index cceeee3b5c3f..5b0793f98207 100644 --- a/receiver/awscontainerinsightreceiver/internal/k8sapiserver/leaderelection_test.go +++ b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/leaderelection_test.go @@ -99,6 +99,8 @@ func TestLeaderElectionEndToEnd(t *testing.T) { leaderElection.cancel() assert.Eventually(t, func() bool { + leaderElection.mu.Lock() + defer leaderElection.mu.Unlock() return !leaderElection.leading }, 2*time.Second, 5*time.Millisecond) }) From 7be89666136c8ec96495f5c62e14608798c33f49 Mon Sep 17 00:00:00 2001 From: Chad Patel Date: Tue, 30 May 2023 20:47:30 -0500 Subject: [PATCH 15/17] attempt to fix test panic --- .../k8sapiserver/leaderelection_test.go | 19 +++---------------- 1 file changed, 3 insertions(+), 16 deletions(-) diff --git a/receiver/awscontainerinsightreceiver/internal/k8sapiserver/leaderelection_test.go b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/leaderelection_test.go index 5b0793f98207..8ab76f2736d7 100644 --- a/receiver/awscontainerinsightreceiver/internal/k8sapiserver/leaderelection_test.go +++ b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/leaderelection_test.go @@ -32,21 +32,6 @@ func WithClient(client K8sClient) LeaderElectionOption { } } -func TestNewLeaderElectionOpts(t *testing.T) { - t.Setenv("HOST_NAME", "hostname") - t.Setenv("K8S_NAMESPACE", "namespace") - - le, err := NewLeaderElection(zap.NewNop(), WithClient(&mockK8sClient{}), WithLeaderLockName("test"), WithLeaderLockUsingConfigMapOnly(true)) - t.Cleanup(func() { - le.cancel() - }) - assert.NotNil(t, le) - assert.NoError(t, err) - assert.Equal(t, "test", le.leaderLockName) - assert.True(t, le.leaderLockUsingConfigMapOnly) - -} - func TestLeaderElectionInitErrors(t *testing.T) { le, err := NewLeaderElection(zap.NewNop(), WithClient(&mockK8sClient{})) assert.Error(t, err) @@ -77,10 +62,12 @@ func TestLeaderElectionEndToEnd(t *testing.T) { t.Setenv("HOST_NAME", hostName) t.Setenv("K8S_NAMESPACE", "namespace") leaderElection, err := NewLeaderElection(zap.NewNop(), WithClient(&mockK8sClient{}), - leadingOption, broadcasterOption, isLeadingCOption) + leadingOption, broadcasterOption, isLeadingCOption, WithLeaderLockName("test"), WithLeaderLockUsingConfigMapOnly(true)) assert.NotNil(t, leaderElection) assert.NoError(t, err) + assert.Equal(t, "test", leaderElection.leaderLockName) + assert.True(t, leaderElection.leaderLockUsingConfigMapOnly) mockClient.On("NamespaceToRunningPodNum").Return(map[string]int{"default": 2}) mockClient.On("ClusterFailedNodeCount").Return(1) From 58ab7cae167bf446a7ecb3af2de35668f23eeb06 Mon Sep 17 00:00:00 2001 From: Chad Patel Date: Tue, 30 May 2023 20:59:13 -0500 Subject: [PATCH 16/17] fix k8sclient unit tests --- internal/aws/k8s/k8sclient/daemonset_test.go | 2 +- internal/aws/k8s/k8sclient/deployment_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/aws/k8s/k8sclient/daemonset_test.go b/internal/aws/k8s/k8sclient/daemonset_test.go index ea664d27a583..d54874c37f80 100644 --- a/internal/aws/k8s/k8sclient/daemonset_test.go +++ b/internal/aws/k8s/k8sclient/daemonset_test.go @@ -89,7 +89,7 @@ func TestDaemonSetClient(t *testing.T) { }, } actual := client.DaemonSetInfos() - assert.Equal(t, expected, actual) + assert.EqualValues(t, expected, actual) client.shutdown() assert.True(t, client.stopped) } diff --git a/internal/aws/k8s/k8sclient/deployment_test.go b/internal/aws/k8s/k8sclient/deployment_test.go index 2de9d737b154..54c1f9627526 100644 --- a/internal/aws/k8s/k8sclient/deployment_test.go +++ b/internal/aws/k8s/k8sclient/deployment_test.go @@ -99,7 +99,7 @@ func TestDeploymentClient(t *testing.T) { }, } actual := client.DeploymentInfos() - assert.Equal(t, expected, actual) + assert.EqualValues(t, expected, actual) client.shutdown() assert.True(t, client.stopped) } From c12b747dc151fa1643a36c0a66e1d99cb92208df Mon Sep 17 00:00:00 2001 From: Chad Patel Date: Wed, 31 May 2023 10:26:46 -0500 Subject: [PATCH 17/17] fix k8sclient unit tests by sorting the results --- internal/aws/k8s/k8sclient/daemonset_test.go | 6 +++++- internal/aws/k8s/k8sclient/deployment_test.go | 6 +++++- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/internal/aws/k8s/k8sclient/daemonset_test.go b/internal/aws/k8s/k8sclient/daemonset_test.go index d54874c37f80..bf90a1698e3d 100644 --- a/internal/aws/k8s/k8sclient/daemonset_test.go +++ b/internal/aws/k8s/k8sclient/daemonset_test.go @@ -14,6 +14,7 @@ package k8sclient import ( + "sort" "testing" "github.com/stretchr/testify/assert" @@ -89,7 +90,10 @@ func TestDaemonSetClient(t *testing.T) { }, } actual := client.DaemonSetInfos() - assert.EqualValues(t, expected, actual) + sort.Slice(actual, func(i, j int) bool { + return actual[i].Name < actual[j].Name + }) + assert.Equal(t, expected, actual) client.shutdown() assert.True(t, client.stopped) } diff --git a/internal/aws/k8s/k8sclient/deployment_test.go b/internal/aws/k8s/k8sclient/deployment_test.go index 54c1f9627526..44ee79b94cca 100644 --- a/internal/aws/k8s/k8sclient/deployment_test.go +++ b/internal/aws/k8s/k8sclient/deployment_test.go @@ -14,6 +14,7 @@ package k8sclient import ( + "sort" "testing" "github.com/stretchr/testify/assert" @@ -99,7 +100,10 @@ func TestDeploymentClient(t *testing.T) { }, } actual := client.DeploymentInfos() - assert.EqualValues(t, expected, actual) + sort.Slice(actual, func(i, j int) bool { + return actual[i].Name < actual[j].Name + }) + assert.Equal(t, expected, actual) client.shutdown() assert.True(t, client.stopped) }