From 1b6095de645d314cb139e002372ce0d638502613 Mon Sep 17 00:00:00 2001 From: Roger Coll Date: Thu, 9 May 2024 18:09:12 +0200 Subject: [PATCH 1/9] feat: add shutdown method for podmanreceiver --- receiver/podmanreceiver/factory.go | 8 ++++- receiver/podmanreceiver/factory_test.go | 14 --------- receiver/podmanreceiver/receiver.go | 39 ++++++++++++------------ receiver/podmanreceiver/receiver_test.go | 29 ++++++++---------- 4 files changed, 40 insertions(+), 50 deletions(-) diff --git a/receiver/podmanreceiver/factory.go b/receiver/podmanreceiver/factory.go index f154e628d3ea..c07df858a585 100644 --- a/receiver/podmanreceiver/factory.go +++ b/receiver/podmanreceiver/factory.go @@ -50,5 +50,11 @@ func createMetricsReceiver( consumer consumer.Metrics, ) (receiver.Metrics, error) { podmanConfig := config.(*Config) - return newMetricsReceiver(ctx, params, podmanConfig, consumer, nil) + + recv := newMetricsReceiver(params, podmanConfig, nil) + scrp, err := scraperhelper.NewScraper(metadata.Type.String(), recv.scrape, scraperhelper.WithStart(recv.start), scraperhelper.WithShutdown(recv.shutdown)) + if err != nil { + return nil, err + } + return scraperhelper.NewScraperControllerReceiver(&recv.config.ControllerConfig, params, consumer, scraperhelper.AddScraper(scrp)) } diff --git a/receiver/podmanreceiver/factory_test.go b/receiver/podmanreceiver/factory_test.go index e7b3eae1f1d1..47fad4e19766 100644 --- a/receiver/podmanreceiver/factory_test.go +++ b/receiver/podmanreceiver/factory_test.go @@ -38,17 +38,3 @@ func TestCreateReceiver(t *testing.T) { assert.NoError(t, err, "Metric receiver creation failed") assert.NotNil(t, metricReceiver, "Receiver creation failed") } - -func TestCreateInvalidEndpoint(t *testing.T) { - factory := NewFactory() - config := factory.CreateDefaultConfig() - receiverCfg := config.(*Config) - - receiverCfg.Endpoint = "" - - params := receivertest.NewNopCreateSettings() - recv, err := factory.CreateMetricsReceiver(context.Background(), params, receiverCfg, consumertest.NewNop()) - assert.Nil(t, recv) - assert.Error(t, err) - assert.Equal(t, "config.Endpoint must be specified", err.Error()) -} diff --git a/receiver/podmanreceiver/receiver.go b/receiver/podmanreceiver/receiver.go index 4ea3cc218468..4e2b1e3d546b 100644 --- a/receiver/podmanreceiver/receiver.go +++ b/receiver/podmanreceiver/receiver.go @@ -12,12 +12,10 @@ import ( "time" "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/receiver" "go.opentelemetry.io/collector/receiver/scrapererror" - "go.opentelemetry.io/collector/receiver/scraperhelper" "go.uber.org/multierr" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/podmanreceiver/internal/metadata" @@ -29,40 +27,32 @@ type metricsReceiver struct { clientFactory clientFactory scraper *ContainerScraper mb *metadata.MetricsBuilder + cancel context.CancelFunc } func newMetricsReceiver( - _ context.Context, set receiver.CreateSettings, config *Config, - nextConsumer consumer.Metrics, clientFactory clientFactory, -) (receiver.Metrics, error) { - err := config.Validate() - if err != nil { - return nil, err - } - +) *metricsReceiver { if clientFactory == nil { clientFactory = newLibpodClient } - recv := &metricsReceiver{ + return &metricsReceiver{ config: config, clientFactory: clientFactory, set: set, mb: metadata.NewMetricsBuilder(config.MetricsBuilderConfig, set), } +} - scrp, err := scraperhelper.NewScraper(metadata.Type.String(), recv.scrape, scraperhelper.WithStart(recv.start)) +func (r *metricsReceiver) start(ctx context.Context, _ component.Host) error { + err := r.config.Validate() if err != nil { - return nil, err + return err } - return scraperhelper.NewScraperControllerReceiver(&recv.config.ControllerConfig, set, nextConsumer, scraperhelper.AddScraper(scrp)) -} -func (r *metricsReceiver) start(ctx context.Context, _ component.Host) error { - var err error podmanClient, err := r.clientFactory(r.set.Logger, r.config) if err != nil { return err @@ -72,7 +62,19 @@ func (r *metricsReceiver) start(ctx context.Context, _ component.Host) error { if err = r.scraper.loadContainerList(ctx); err != nil { return err } - go r.scraper.containerEventLoop(ctx) + + cctx, cancel := context.WithCancel(ctx) + r.cancel = cancel + + go r.scraper.containerEventLoop(cctx) + + return nil +} + +func (r *metricsReceiver) shutdown(context.Context) error { + if r.cancel != nil { + r.cancel() + } return nil } @@ -136,7 +138,6 @@ func (r *metricsReceiver) recordCPUMetrics(now pcommon.Timestamp, stats *contain for i, cpu := range stats.PerCPU { r.mb.RecordContainerCPUUsagePercpuDataPoint(now, int64(toSecondsWithNanosecondPrecision(cpu)), fmt.Sprintf("cpu%d", i)) } - } func (r *metricsReceiver) recordNetworkMetrics(now pcommon.Timestamp, stats *containerStats) { diff --git a/receiver/podmanreceiver/receiver_test.go b/receiver/podmanreceiver/receiver_test.go index cd08f24bdfda..63766599a58c 100644 --- a/receiver/podmanreceiver/receiver_test.go +++ b/receiver/podmanreceiver/receiver_test.go @@ -16,7 +16,6 @@ import ( "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/consumer" - "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/receiver/receivertest" "go.opentelemetry.io/collector/receiver/scraperhelper" @@ -31,21 +30,20 @@ func TestNewReceiver(t *testing.T) { InitialDelay: time.Second, }, } - nextConsumer := consumertest.NewNop() - mr, err := newMetricsReceiver(context.Background(), receivertest.NewNopCreateSettings(), config, nextConsumer, nil) - + mr := newMetricsReceiver(receivertest.NewNopCreateSettings(), config, nil) assert.NotNil(t, mr) - assert.NoError(t, err) } -func TestNewReceiverErrors(t *testing.T) { - r, err := newMetricsReceiver(context.Background(), receivertest.NewNopCreateSettings(), &Config{}, consumertest.NewNop(), nil) - assert.Nil(t, r) +func TestErrorsInStart(t *testing.T) { + recv := newMetricsReceiver(receivertest.NewNopCreateSettings(), &Config{}, nil) + assert.NotNil(t, recv) + err := recv.start(context.Background(), componenttest.NewNopHost()) require.Error(t, err) assert.Equal(t, "config.Endpoint must be specified", err.Error()) - r, err = newMetricsReceiver(context.Background(), receivertest.NewNopCreateSettings(), &Config{Endpoint: "someEndpoint"}, consumertest.NewNop(), nil) - assert.Nil(t, r) + recv = newMetricsReceiver(receivertest.NewNopCreateSettings(), &Config{Endpoint: "someEndpoint"}, nil) + assert.NotNil(t, recv) + err = recv.start(context.Background(), componenttest.NewNopHost()) require.Error(t, err) assert.Equal(t, "config.CollectionInterval must be specified", err.Error()) } @@ -55,13 +53,11 @@ func TestScraperLoop(t *testing.T) { cfg.CollectionInterval = 100 * time.Millisecond client := make(mockClient) - consumer := make(mockConsumer) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - r, err := newMetricsReceiver(ctx, receivertest.NewNopCreateSettings(), cfg, consumer, client.factory) - require.NoError(t, err) + r := newMetricsReceiver(receivertest.NewNopCreateSettings(), cfg, client.factory) assert.NotNil(t, r) go func() { @@ -74,14 +70,15 @@ func TestScraperLoop(t *testing.T) { } }() - assert.NoError(t, r.Start(ctx, componenttest.NewNopHost())) + assert.NoError(t, r.start(ctx, componenttest.NewNopHost())) - md := <-consumer + md, err := r.scrape(ctx) + assert.NoError(t, err) assert.Equal(t, 1, md.ResourceMetrics().Len()) assertStatsEqualToMetrics(t, genContainerStats(), md) - assert.NoError(t, r.Shutdown(ctx)) + assert.NoError(t, r.shutdown(ctx)) } type mockClient chan containerStatsReport From 08397a8d523820b850615a49b12f51b887d7ff47 Mon Sep 17 00:00:00 2001 From: Roger Coll Date: Fri, 10 May 2024 16:29:05 +0200 Subject: [PATCH 2/9] fix linter --- receiver/podmanreceiver/factory.go | 2 +- receiver/podmanreceiver/receiver_test.go | 13 ------------- 2 files changed, 1 insertion(+), 14 deletions(-) diff --git a/receiver/podmanreceiver/factory.go b/receiver/podmanreceiver/factory.go index c07df858a585..cc5221f899fb 100644 --- a/receiver/podmanreceiver/factory.go +++ b/receiver/podmanreceiver/factory.go @@ -44,7 +44,7 @@ func createDefaultReceiverConfig() component.Config { } func createMetricsReceiver( - ctx context.Context, + _ context.Context, params receiver.CreateSettings, config component.Config, consumer consumer.Metrics, diff --git a/receiver/podmanreceiver/receiver_test.go b/receiver/podmanreceiver/receiver_test.go index 63766599a58c..8db911c00bc5 100644 --- a/receiver/podmanreceiver/receiver_test.go +++ b/receiver/podmanreceiver/receiver_test.go @@ -15,8 +15,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component/componenttest" - "go.opentelemetry.io/collector/consumer" - "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/receiver/receivertest" "go.opentelemetry.io/collector/receiver/scraperhelper" "go.uber.org/zap" @@ -99,8 +97,6 @@ func (c mockClient) ping(context.Context) error { return nil } -type mockConsumer chan pmetric.Metrics - func (c mockClient) list(context.Context, url.Values) ([]container, error) { return []container{{ID: "c1", Image: "localimage"}}, nil } @@ -108,12 +104,3 @@ func (c mockClient) list(context.Context, url.Values) ([]container, error) { func (c mockClient) events(context.Context, url.Values) (<-chan event, <-chan error) { return nil, nil } - -func (m mockConsumer) Capabilities() consumer.Capabilities { - return consumer.Capabilities{} -} - -func (m mockConsumer) ConsumeMetrics(_ context.Context, md pmetric.Metrics) error { - m <- md - return nil -} From 2bf6a4a07f474eba20969e6a925f932e5f840cb1 Mon Sep 17 00:00:00 2001 From: Roger Coll Date: Sat, 11 May 2024 13:10:33 +0200 Subject: [PATCH 3/9] defer shutdown assertion --- receiver/podmanreceiver/receiver_test.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/receiver/podmanreceiver/receiver_test.go b/receiver/podmanreceiver/receiver_test.go index 8db911c00bc5..eb8e691e2b01 100644 --- a/receiver/podmanreceiver/receiver_test.go +++ b/receiver/podmanreceiver/receiver_test.go @@ -69,14 +69,13 @@ func TestScraperLoop(t *testing.T) { }() assert.NoError(t, r.start(ctx, componenttest.NewNopHost())) + defer assert.NoError(t, r.shutdown(ctx)) md, err := r.scrape(ctx) assert.NoError(t, err) assert.Equal(t, 1, md.ResourceMetrics().Len()) assertStatsEqualToMetrics(t, genContainerStats(), md) - - assert.NoError(t, r.shutdown(ctx)) } type mockClient chan containerStatsReport From 6b568e26770cbf241870074a48335b72bcfe0525 Mon Sep 17 00:00:00 2001 From: Roger Coll Date: Sat, 11 May 2024 13:12:41 +0200 Subject: [PATCH 4/9] feat: use background context for long-running operation --- receiver/podmanreceiver/receiver.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/receiver/podmanreceiver/receiver.go b/receiver/podmanreceiver/receiver.go index 4e2b1e3d546b..e77a1943ff22 100644 --- a/receiver/podmanreceiver/receiver.go +++ b/receiver/podmanreceiver/receiver.go @@ -63,7 +63,8 @@ func (r *metricsReceiver) start(ctx context.Context, _ component.Host) error { return err } - cctx, cancel := context.WithCancel(ctx) + // context for long-running operation + cctx, cancel := context.WithCancel(context.Background()) r.cancel = cancel go r.scraper.containerEventLoop(cctx) From 9a82696392307fbfebc7b148b0239b4aa2eaf721 Mon Sep 17 00:00:00 2001 From: Roger Coll Date: Sat, 11 May 2024 17:03:46 +0200 Subject: [PATCH 5/9] fix windows factory --- receiver/podmanreceiver/factory.go | 18 ------------------ receiver/podmanreceiver/receiver.go | 18 ++++++++++++++++++ receiver/podmanreceiver/receiver_windows.go | 13 ++++++++++++- .../podmanreceiver/receiver_windows_test.go | 3 +-- 4 files changed, 31 insertions(+), 21 deletions(-) diff --git a/receiver/podmanreceiver/factory.go b/receiver/podmanreceiver/factory.go index cc5221f899fb..f24b9c8a7f82 100644 --- a/receiver/podmanreceiver/factory.go +++ b/receiver/podmanreceiver/factory.go @@ -4,11 +4,9 @@ package podmanreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/podmanreceiver" import ( - "context" "time" "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/receiver" "go.opentelemetry.io/collector/receiver/scraperhelper" @@ -42,19 +40,3 @@ func createDefaultConfig() *Config { func createDefaultReceiverConfig() component.Config { return createDefaultConfig() } - -func createMetricsReceiver( - _ context.Context, - params receiver.CreateSettings, - config component.Config, - consumer consumer.Metrics, -) (receiver.Metrics, error) { - podmanConfig := config.(*Config) - - recv := newMetricsReceiver(params, podmanConfig, nil) - scrp, err := scraperhelper.NewScraper(metadata.Type.String(), recv.scrape, scraperhelper.WithStart(recv.start), scraperhelper.WithShutdown(recv.shutdown)) - if err != nil { - return nil, err - } - return scraperhelper.NewScraperControllerReceiver(&recv.config.ControllerConfig, params, consumer, scraperhelper.AddScraper(scrp)) -} diff --git a/receiver/podmanreceiver/receiver.go b/receiver/podmanreceiver/receiver.go index e77a1943ff22..585b6f074bb5 100644 --- a/receiver/podmanreceiver/receiver.go +++ b/receiver/podmanreceiver/receiver.go @@ -12,10 +12,12 @@ import ( "time" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/receiver" "go.opentelemetry.io/collector/receiver/scrapererror" + "go.opentelemetry.io/collector/receiver/scraperhelper" "go.uber.org/multierr" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/podmanreceiver/internal/metadata" @@ -47,6 +49,22 @@ func newMetricsReceiver( } } +func createMetricsReceiver( + _ context.Context, + params receiver.CreateSettings, + config component.Config, + consumer consumer.Metrics, +) (receiver.Metrics, error) { + podmanConfig := config.(*Config) + + recv := newMetricsReceiver(params, podmanConfig, nil) + scrp, err := scraperhelper.NewScraper(metadata.Type.String(), recv.scrape, scraperhelper.WithStart(recv.start), scraperhelper.WithShutdown(recv.shutdown)) + if err != nil { + return nil, err + } + return scraperhelper.NewScraperControllerReceiver(&recv.config.ControllerConfig, params, consumer, scraperhelper.AddScraper(scrp)) +} + func (r *metricsReceiver) start(ctx context.Context, _ component.Host) error { err := r.config.Validate() if err != nil { diff --git a/receiver/podmanreceiver/receiver_windows.go b/receiver/podmanreceiver/receiver_windows.go index a2811ef4b345..e93ea54432d3 100644 --- a/receiver/podmanreceiver/receiver_windows.go +++ b/receiver/podmanreceiver/receiver_windows.go @@ -7,12 +7,12 @@ import ( "context" "fmt" + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/receiver" ) func newMetricsReceiver( - _ context.Context, _ receiver.CreateSettings, _ *Config, _ consumer.Metrics, @@ -20,3 +20,14 @@ func newMetricsReceiver( ) (receiver.Metrics, error) { return nil, fmt.Errorf("podman receiver is not supported on windows") } + +func createMetricsReceiver( + _ context.Context, + params receiver.CreateSettings, + config component.Config, + consumer consumer.Metrics, +) (receiver.Metrics, error) { + podmanConfig := config.(*Config) + + return newMetricsReceiver(params, podmanConfig, nil, consumer) +} diff --git a/receiver/podmanreceiver/receiver_windows_test.go b/receiver/podmanreceiver/receiver_windows_test.go index df40abb722ed..460c286613c9 100644 --- a/receiver/podmanreceiver/receiver_windows_test.go +++ b/receiver/podmanreceiver/receiver_windows_test.go @@ -4,7 +4,6 @@ package podmanreceiver import ( - "context" "testing" "github.com/stretchr/testify/assert" @@ -13,7 +12,7 @@ import ( ) func TestNewReceiver(t *testing.T) { - mr, err := newMetricsReceiver(context.Background(), receivertest.NewNopCreateSettings(), &Config{}, consumertest.NewNop(), nil) + mr, err := newMetricsReceiver(receivertest.NewNopCreateSettings(), &Config{}, consumertest.NewNop(), nil) assert.Nil(t, mr) assert.Error(t, err) assert.Equal(t, "podman receiver is not supported on windows", err.Error()) From c00c4a51be6f35aec3f9e80a39c859d43ee1eba4 Mon Sep 17 00:00:00 2001 From: Roger Coll Date: Sat, 11 May 2024 17:41:42 +0200 Subject: [PATCH 6/9] chore: add changelog --- .chloggen/add_shutdown_podman.yaml | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) create mode 100644 .chloggen/add_shutdown_podman.yaml diff --git a/.chloggen/add_shutdown_podman.yaml b/.chloggen/add_shutdown_podman.yaml new file mode 100644 index 000000000000..b60f6bd13fea --- /dev/null +++ b/.chloggen/add_shutdown_podman.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: podmanreceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: add scraper's shutdown method + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [29994] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] From 50419a1e6b5a94404e1f800c367a1edb51e8041b Mon Sep 17 00:00:00 2001 From: Roger Coll Date: Mon, 13 May 2024 20:24:37 +0200 Subject: [PATCH 7/9] bug_fix changelog Co-authored-by: Curtis Robert --- .chloggen/add_shutdown_podman.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.chloggen/add_shutdown_podman.yaml b/.chloggen/add_shutdown_podman.yaml index b60f6bd13fea..aa2cbbdced4e 100644 --- a/.chloggen/add_shutdown_podman.yaml +++ b/.chloggen/add_shutdown_podman.yaml @@ -1,7 +1,7 @@ # Use this changelog template to create an entry for release notes. # One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' -change_type: enhancement +change_type: bug_fix # The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) component: podmanreceiver From 863440d8064ae3312c8471cf95bcb4327a87ecd3 Mon Sep 17 00:00:00 2001 From: Roger Coll Date: Mon, 13 May 2024 20:25:17 +0200 Subject: [PATCH 8/9] defer anonymous shutdown test function Co-authored-by: Curtis Robert --- receiver/podmanreceiver/receiver_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/receiver/podmanreceiver/receiver_test.go b/receiver/podmanreceiver/receiver_test.go index eb8e691e2b01..8b2210071ed8 100644 --- a/receiver/podmanreceiver/receiver_test.go +++ b/receiver/podmanreceiver/receiver_test.go @@ -69,7 +69,7 @@ func TestScraperLoop(t *testing.T) { }() assert.NoError(t, r.start(ctx, componenttest.NewNopHost())) - defer assert.NoError(t, r.shutdown(ctx)) + defer func(){ assert.NoError(t, r.shutdown(ctx)) }() md, err := r.scrape(ctx) assert.NoError(t, err) From 18d216c03d2c9ac602abfec2513eda0c4bc26ba6 Mon Sep 17 00:00:00 2001 From: Roger Coll Date: Thu, 16 May 2024 08:25:54 +0200 Subject: [PATCH 9/9] Update receiver/podmanreceiver/receiver_test.go Co-authored-by: Curtis Robert --- receiver/podmanreceiver/receiver_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/receiver/podmanreceiver/receiver_test.go b/receiver/podmanreceiver/receiver_test.go index 8b2210071ed8..fb3704c549f8 100644 --- a/receiver/podmanreceiver/receiver_test.go +++ b/receiver/podmanreceiver/receiver_test.go @@ -69,7 +69,7 @@ func TestScraperLoop(t *testing.T) { }() assert.NoError(t, r.start(ctx, componenttest.NewNopHost())) - defer func(){ assert.NoError(t, r.shutdown(ctx)) }() + defer func() { assert.NoError(t, r.shutdown(ctx)) }() md, err := r.scrape(ctx) assert.NoError(t, err)