From 6d14f5701f34dd8d97878c2767b544e9452d8d2e Mon Sep 17 00:00:00 2001 From: Antoine Toulme Date: Thu, 17 Oct 2024 00:08:31 -0700 Subject: [PATCH 1/2] [receiver/ntp] add initial implementation --- receiver/ntpreceiver/config.go | 19 ++++++ receiver/ntpreceiver/config_test.go | 65 +++++++++++++++++++ receiver/ntpreceiver/documentation.md | 2 +- receiver/ntpreceiver/factory.go | 24 ++++++- receiver/ntpreceiver/factory_test.go | 18 +++++ .../ntpreceiver/generated_component_test.go | 51 +++++++++++++++ receiver/ntpreceiver/go.mod | 3 +- receiver/ntpreceiver/go.sum | 2 + .../internal/metadata/generated_metrics.go | 6 +- .../metadata/generated_metrics_test.go | 4 +- receiver/ntpreceiver/metadata.yaml | 6 +- receiver/ntpreceiver/receiver.go | 47 ++++++++++++++ 12 files changed, 233 insertions(+), 14 deletions(-) create mode 100644 receiver/ntpreceiver/config_test.go create mode 100644 receiver/ntpreceiver/factory_test.go create mode 100644 receiver/ntpreceiver/receiver.go diff --git a/receiver/ntpreceiver/config.go b/receiver/ntpreceiver/config.go index b5f78740c498..934d3df77021 100644 --- a/receiver/ntpreceiver/config.go +++ b/receiver/ntpreceiver/config.go @@ -4,6 +4,11 @@ package ntpreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/ntpreceiver" import ( + "errors" + "fmt" + "net" + "time" + "go.opentelemetry.io/collector/receiver/scraperhelper" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/ntpreceiver/internal/metadata" @@ -13,5 +18,19 @@ import ( type Config struct { scraperhelper.ControllerConfig `mapstructure:",squash"` metadata.MetricsBuilderConfig `mapstructure:",squash"` + Version int `mapstructure:"version"` Endpoint string `mapstructure:"endpoint"` } + +func (c *Config) Validate() error { + var errs []error + _, _, err := net.SplitHostPort(c.Endpoint) + if err != nil { + errs = append(errs, err) + } + // respect terms of service https://www.pool.ntp.org/tos.html + if c.ControllerConfig.CollectionInterval < 30*time.Minute { + errs = append(errs, fmt.Errorf("collection interval %v is less than minimum 30m", c.ControllerConfig.CollectionInterval)) + } + return errors.Join(errs...) +} diff --git a/receiver/ntpreceiver/config_test.go b/receiver/ntpreceiver/config_test.go new file mode 100644 index 000000000000..1ba68eec7f26 --- /dev/null +++ b/receiver/ntpreceiver/config_test.go @@ -0,0 +1,65 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package ntpreceiver + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/receiver/scraperhelper" +) + +func TestValidate(t *testing.T) { + for _, tt := range []struct { + name string + c *Config + errorExpected string + }{ + { + name: "no host", + c: &Config{ + Version: 4, + Endpoint: "", + ControllerConfig: scraperhelper.ControllerConfig{CollectionInterval: 45 * time.Minute}, + }, + errorExpected: "missing port in address", + }, + { + name: "no port", + c: &Config{ + Version: 4, + Endpoint: "pool.ntp.org", + ControllerConfig: scraperhelper.ControllerConfig{CollectionInterval: 45 * time.Minute}, + }, + errorExpected: "address pool.ntp.org: missing port in address", + }, + { + name: "valid", + c: &Config{ + Version: 4, + Endpoint: "pool.ntp.org:123", + ControllerConfig: scraperhelper.ControllerConfig{CollectionInterval: 45 * time.Minute}, + }, + }, + { + name: "interval too small", + c: &Config{ + Version: 4, + Endpoint: "pool.ntp.org:123", + ControllerConfig: scraperhelper.ControllerConfig{CollectionInterval: 29 * time.Minute}, + }, + errorExpected: "collection interval 29m0s is less than minimum 30m", + }, + } { + t.Run(tt.name, func(t *testing.T) { + err := tt.c.Validate() + if tt.errorExpected == "" { + require.NoError(t, err) + } else { + require.EqualError(t, err, tt.errorExpected) + } + }) + } +} diff --git a/receiver/ntpreceiver/documentation.md b/receiver/ntpreceiver/documentation.md index 182193963883..b999ef23396d 100644 --- a/receiver/ntpreceiver/documentation.md +++ b/receiver/ntpreceiver/documentation.md @@ -18,7 +18,7 @@ Time difference between local and NTP server clocks in seconds. | Unit | Metric Type | Value Type | | ---- | ----------- | ---------- | -| s | Gauge | Int | +| s | Gauge | Double | ## Resource Attributes diff --git a/receiver/ntpreceiver/factory.go b/receiver/ntpreceiver/factory.go index 07b3a9a7534f..7ff2d5a4be22 100644 --- a/receiver/ntpreceiver/factory.go +++ b/receiver/ntpreceiver/factory.go @@ -5,6 +5,7 @@ package ntpreceiver // import "github.com/open-telemetry/opentelemetry-collector import ( "context" + "time" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" @@ -23,12 +24,29 @@ func NewFactory() receiver.Factory { } func createDefaultConfig() component.Config { + scraperConfig := scraperhelper.NewDefaultControllerConfig() + scraperConfig.CollectionInterval = 30 * time.Minute return &Config{ - ControllerConfig: scraperhelper.NewDefaultControllerConfig(), + ControllerConfig: scraperConfig, MetricsBuilderConfig: metadata.DefaultMetricsBuilderConfig(), + Version: 4, + Endpoint: "pool.ntp.org:123", } } -func createMetricsReceiver(_ context.Context, _ receiver.Settings, _ component.Config, _ consumer.Metrics) (receiver.Metrics, error) { - return nil, nil +func createMetricsReceiver(_ context.Context, settings receiver.Settings, cfg component.Config, consumer consumer.Metrics) (receiver.Metrics, error) { + rCfg := cfg.(*Config) + mp := newScraper(rCfg, settings) + s, err := scraperhelper.NewScraper(metadata.Type, mp.scrape) + if err != nil { + return nil, err + } + opt := scraperhelper.AddScraper(s) + + return scraperhelper.NewScraperControllerReceiver( + &rCfg.ControllerConfig, + settings, + consumer, + opt, + ) } diff --git a/receiver/ntpreceiver/factory_test.go b/receiver/ntpreceiver/factory_test.go new file mode 100644 index 000000000000..718ced896c44 --- /dev/null +++ b/receiver/ntpreceiver/factory_test.go @@ -0,0 +1,18 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package ntpreceiver + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestCreateDefaultConfig(t *testing.T) { + c := createDefaultConfig().(*Config) + require.Equal(t, 4, c.Version) + require.Equal(t, "pool.ntp.org:123", c.Endpoint) + require.Equal(t, 30*time.Minute, c.CollectionInterval) +} diff --git a/receiver/ntpreceiver/generated_component_test.go b/receiver/ntpreceiver/generated_component_test.go index 5e8b22347941..a454a725fcfc 100644 --- a/receiver/ntpreceiver/generated_component_test.go +++ b/receiver/ntpreceiver/generated_component_test.go @@ -3,10 +3,16 @@ package ntpreceiver import ( + "context" "testing" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/confmap/confmaptest" + "go.opentelemetry.io/collector/consumer/consumertest" + "go.opentelemetry.io/collector/receiver" + "go.opentelemetry.io/collector/receiver/receivertest" ) func TestComponentFactoryType(t *testing.T) { @@ -16,3 +22,48 @@ func TestComponentFactoryType(t *testing.T) { func TestComponentConfigStruct(t *testing.T) { require.NoError(t, componenttest.CheckConfigStruct(NewFactory().CreateDefaultConfig())) } + +func TestComponentLifecycle(t *testing.T) { + factory := NewFactory() + + tests := []struct { + name string + createFn func(ctx context.Context, set receiver.Settings, cfg component.Config) (component.Component, error) + }{ + + { + name: "metrics", + createFn: func(ctx context.Context, set receiver.Settings, cfg component.Config) (component.Component, error) { + return factory.CreateMetrics(ctx, set, cfg, consumertest.NewNop()) + }, + }, + } + + cm, err := confmaptest.LoadConf("metadata.yaml") + require.NoError(t, err) + cfg := factory.CreateDefaultConfig() + sub, err := cm.Sub("tests::config") + require.NoError(t, err) + require.NoError(t, sub.Unmarshal(&cfg)) + + for _, tt := range tests { + t.Run(tt.name+"-shutdown", func(t *testing.T) { + c, err := tt.createFn(context.Background(), receivertest.NewNopSettings(), cfg) + require.NoError(t, err) + err = c.Shutdown(context.Background()) + require.NoError(t, err) + }) + t.Run(tt.name+"-lifecycle", func(t *testing.T) { + firstRcvr, err := tt.createFn(context.Background(), receivertest.NewNopSettings(), cfg) + require.NoError(t, err) + host := componenttest.NewNopHost() + require.NoError(t, err) + require.NoError(t, firstRcvr.Start(context.Background(), host)) + require.NoError(t, firstRcvr.Shutdown(context.Background())) + secondRcvr, err := tt.createFn(context.Background(), receivertest.NewNopSettings(), cfg) + require.NoError(t, err) + require.NoError(t, secondRcvr.Start(context.Background(), host)) + require.NoError(t, secondRcvr.Shutdown(context.Background())) + }) + } +} diff --git a/receiver/ntpreceiver/go.mod b/receiver/ntpreceiver/go.mod index 71d67ef93dd2..2d20374b13e1 100644 --- a/receiver/ntpreceiver/go.mod +++ b/receiver/ntpreceiver/go.mod @@ -3,11 +3,13 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/receiver/ntprec go 1.22.0 require ( + github.com/beevik/ntp v1.4.3 github.com/google/go-cmp v0.6.0 github.com/stretchr/testify v1.9.0 go.opentelemetry.io/collector/component v0.111.1-0.20241008154146-ea48c09c31ae go.opentelemetry.io/collector/confmap v1.17.1-0.20241008154146-ea48c09c31ae go.opentelemetry.io/collector/consumer v0.111.1-0.20241008154146-ea48c09c31ae + go.opentelemetry.io/collector/consumer/consumertest v0.111.1-0.20241008154146-ea48c09c31ae go.opentelemetry.io/collector/filter v0.111.1-0.20241008154146-ea48c09c31ae go.opentelemetry.io/collector/pdata v1.17.1-0.20241008154146-ea48c09c31ae go.opentelemetry.io/collector/receiver v0.111.1-0.20241008154146-ea48c09c31ae @@ -34,7 +36,6 @@ require ( github.com/rogpeppe/go-internal v1.12.0 // indirect go.opentelemetry.io/collector/config/configtelemetry v0.111.1-0.20241008154146-ea48c09c31ae // indirect go.opentelemetry.io/collector/consumer/consumerprofiles v0.111.1-0.20241008154146-ea48c09c31ae // indirect - go.opentelemetry.io/collector/consumer/consumertest v0.111.1-0.20241008154146-ea48c09c31ae // indirect go.opentelemetry.io/collector/internal/globalsignal v0.111.1-0.20241008154146-ea48c09c31ae // indirect go.opentelemetry.io/collector/pdata/pprofile v0.111.1-0.20241008154146-ea48c09c31ae // indirect go.opentelemetry.io/collector/pipeline v0.111.1-0.20241008154146-ea48c09c31ae // indirect diff --git a/receiver/ntpreceiver/go.sum b/receiver/ntpreceiver/go.sum index 65f0a94a89a6..7c55d9505af5 100644 --- a/receiver/ntpreceiver/go.sum +++ b/receiver/ntpreceiver/go.sum @@ -1,3 +1,5 @@ +github.com/beevik/ntp v1.4.3 h1:PlbTvE5NNy4QHmA4Mg57n7mcFTmr1W1j3gcK7L1lqho= +github.com/beevik/ntp v1.4.3/go.mod h1:Unr8Zg+2dRn7d8bHFuehIMSvvUYssHMxW3Q5Nx4RW5Q= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= diff --git a/receiver/ntpreceiver/internal/metadata/generated_metrics.go b/receiver/ntpreceiver/internal/metadata/generated_metrics.go index afc023c251d6..3c3db06adf0a 100644 --- a/receiver/ntpreceiver/internal/metadata/generated_metrics.go +++ b/receiver/ntpreceiver/internal/metadata/generated_metrics.go @@ -26,14 +26,14 @@ func (m *metricNtpOffset) init() { m.data.SetEmptyGauge() } -func (m *metricNtpOffset) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64) { +func (m *metricNtpOffset) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val float64) { if !m.config.Enabled { return } dp := m.data.Gauge().DataPoints().AppendEmpty() dp.SetStartTimestamp(start) dp.SetTimestamp(ts) - dp.SetIntValue(val) + dp.SetDoubleValue(val) } // updateCapacity saves max length of data point slices that will be used for the slice capacity. @@ -210,7 +210,7 @@ func (mb *MetricsBuilder) Emit(options ...ResourceMetricsOption) pmetric.Metrics } // RecordNtpOffsetDataPoint adds a data point to ntp.offset metric. -func (mb *MetricsBuilder) RecordNtpOffsetDataPoint(ts pcommon.Timestamp, val int64) { +func (mb *MetricsBuilder) RecordNtpOffsetDataPoint(ts pcommon.Timestamp, val float64) { mb.metricNtpOffset.recordDataPoint(mb.startTime, ts, val) } diff --git a/receiver/ntpreceiver/internal/metadata/generated_metrics_test.go b/receiver/ntpreceiver/internal/metadata/generated_metrics_test.go index 4ea82d6f71ac..55106e571e49 100644 --- a/receiver/ntpreceiver/internal/metadata/generated_metrics_test.go +++ b/receiver/ntpreceiver/internal/metadata/generated_metrics_test.go @@ -106,8 +106,8 @@ func TestMetricsBuilder(t *testing.T) { dp := ms.At(i).Gauge().DataPoints().At(0) assert.Equal(t, start, dp.StartTimestamp()) assert.Equal(t, ts, dp.Timestamp()) - assert.Equal(t, pmetric.NumberDataPointValueTypeInt, dp.ValueType()) - assert.Equal(t, int64(1), dp.IntValue()) + assert.Equal(t, pmetric.NumberDataPointValueTypeDouble, dp.ValueType()) + assert.InDelta(t, float64(1), dp.DoubleValue(), 0.01) } } }) diff --git a/receiver/ntpreceiver/metadata.yaml b/receiver/ntpreceiver/metadata.yaml index c6536b0a05c9..feded55ee316 100644 --- a/receiver/ntpreceiver/metadata.yaml +++ b/receiver/ntpreceiver/metadata.yaml @@ -19,9 +19,7 @@ metrics: description: Time difference between local and NTP server clocks in seconds. unit: "s" gauge: - value_type: int + value_type: double enabled: true -tests: - skip_lifecycle: true - skip_shutdown: true \ No newline at end of file +tests: \ No newline at end of file diff --git a/receiver/ntpreceiver/receiver.go b/receiver/ntpreceiver/receiver.go new file mode 100644 index 000000000000..4f0c4f359991 --- /dev/null +++ b/receiver/ntpreceiver/receiver.go @@ -0,0 +1,47 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package ntpreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/ntpreceiver" + +import ( + "context" + "time" + + "github.com/beevik/ntp" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/receiver" + "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/ntpreceiver/internal/metadata" +) + +type scraper struct { + logger *zap.Logger + mb *metadata.MetricsBuilder + version int + timeout time.Duration + endpoint string +} + +func (s *scraper) scrape(context.Context) (pmetric.Metrics, error) { + options := ntp.QueryOptions{Version: s.version, Timeout: s.timeout} + response, err := ntp.QueryWithOptions(s.endpoint, options) + if err != nil { + return pmetric.Metrics{}, err + } + clockOffset := response.ClockOffset.Seconds() + s.mb.RecordNtpOffsetDataPoint(pcommon.NewTimestampFromTime(time.Now()), clockOffset) + s.mb.NewResourceBuilder().SetNtpHost(s.endpoint) + return s.mb.Emit(), nil +} + +func newScraper(cfg *Config, settings receiver.Settings) *scraper { + return &scraper{ + logger: settings.TelemetrySettings.Logger, + mb: metadata.NewMetricsBuilder(cfg.MetricsBuilderConfig, settings), + version: cfg.Version, + timeout: cfg.ControllerConfig.Timeout, + endpoint: cfg.Endpoint, + } +} From 20ea7b46a7c330ae461822ac55c8d171758fd72b Mon Sep 17 00:00:00 2001 From: Antoine Toulme Date: Thu, 17 Oct 2024 08:12:35 -0700 Subject: [PATCH 2/2] change from seconds to ns --- receiver/ntpreceiver/documentation.md | 4 ++-- .../ntpreceiver/internal/metadata/generated_metrics.go | 10 +++++----- .../internal/metadata/generated_metrics_test.go | 8 ++++---- receiver/ntpreceiver/metadata.yaml | 6 +++--- receiver/ntpreceiver/receiver.go | 3 +-- 5 files changed, 15 insertions(+), 16 deletions(-) diff --git a/receiver/ntpreceiver/documentation.md b/receiver/ntpreceiver/documentation.md index b999ef23396d..01463c1de0cf 100644 --- a/receiver/ntpreceiver/documentation.md +++ b/receiver/ntpreceiver/documentation.md @@ -14,11 +14,11 @@ metrics: ### ntp.offset -Time difference between local and NTP server clocks in seconds. +Time difference between local and NTP server clocks | Unit | Metric Type | Value Type | | ---- | ----------- | ---------- | -| s | Gauge | Double | +| ns | Gauge | Int | ## Resource Attributes diff --git a/receiver/ntpreceiver/internal/metadata/generated_metrics.go b/receiver/ntpreceiver/internal/metadata/generated_metrics.go index 3c3db06adf0a..0b77e5ddfea6 100644 --- a/receiver/ntpreceiver/internal/metadata/generated_metrics.go +++ b/receiver/ntpreceiver/internal/metadata/generated_metrics.go @@ -21,19 +21,19 @@ type metricNtpOffset struct { // init fills ntp.offset metric with initial data. func (m *metricNtpOffset) init() { m.data.SetName("ntp.offset") - m.data.SetDescription("Time difference between local and NTP server clocks in seconds.") - m.data.SetUnit("s") + m.data.SetDescription("Time difference between local and NTP server clocks") + m.data.SetUnit("ns") m.data.SetEmptyGauge() } -func (m *metricNtpOffset) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val float64) { +func (m *metricNtpOffset) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64) { if !m.config.Enabled { return } dp := m.data.Gauge().DataPoints().AppendEmpty() dp.SetStartTimestamp(start) dp.SetTimestamp(ts) - dp.SetDoubleValue(val) + dp.SetIntValue(val) } // updateCapacity saves max length of data point slices that will be used for the slice capacity. @@ -210,7 +210,7 @@ func (mb *MetricsBuilder) Emit(options ...ResourceMetricsOption) pmetric.Metrics } // RecordNtpOffsetDataPoint adds a data point to ntp.offset metric. -func (mb *MetricsBuilder) RecordNtpOffsetDataPoint(ts pcommon.Timestamp, val float64) { +func (mb *MetricsBuilder) RecordNtpOffsetDataPoint(ts pcommon.Timestamp, val int64) { mb.metricNtpOffset.recordDataPoint(mb.startTime, ts, val) } diff --git a/receiver/ntpreceiver/internal/metadata/generated_metrics_test.go b/receiver/ntpreceiver/internal/metadata/generated_metrics_test.go index 55106e571e49..4f92fc0d4c33 100644 --- a/receiver/ntpreceiver/internal/metadata/generated_metrics_test.go +++ b/receiver/ntpreceiver/internal/metadata/generated_metrics_test.go @@ -101,13 +101,13 @@ func TestMetricsBuilder(t *testing.T) { validatedMetrics["ntp.offset"] = true assert.Equal(t, pmetric.MetricTypeGauge, ms.At(i).Type()) assert.Equal(t, 1, ms.At(i).Gauge().DataPoints().Len()) - assert.Equal(t, "Time difference between local and NTP server clocks in seconds.", ms.At(i).Description()) - assert.Equal(t, "s", ms.At(i).Unit()) + assert.Equal(t, "Time difference between local and NTP server clocks", ms.At(i).Description()) + assert.Equal(t, "ns", ms.At(i).Unit()) dp := ms.At(i).Gauge().DataPoints().At(0) assert.Equal(t, start, dp.StartTimestamp()) assert.Equal(t, ts, dp.Timestamp()) - assert.Equal(t, pmetric.NumberDataPointValueTypeDouble, dp.ValueType()) - assert.InDelta(t, float64(1), dp.DoubleValue(), 0.01) + assert.Equal(t, pmetric.NumberDataPointValueTypeInt, dp.ValueType()) + assert.Equal(t, int64(1), dp.IntValue()) } } }) diff --git a/receiver/ntpreceiver/metadata.yaml b/receiver/ntpreceiver/metadata.yaml index feded55ee316..f54de24bafdd 100644 --- a/receiver/ntpreceiver/metadata.yaml +++ b/receiver/ntpreceiver/metadata.yaml @@ -16,10 +16,10 @@ resource_attributes: metrics: ntp.offset: - description: Time difference between local and NTP server clocks in seconds. - unit: "s" + description: Time difference between local and NTP server clocks + unit: "ns" gauge: - value_type: double + value_type: int enabled: true tests: \ No newline at end of file diff --git a/receiver/ntpreceiver/receiver.go b/receiver/ntpreceiver/receiver.go index 4f0c4f359991..d52605431b46 100644 --- a/receiver/ntpreceiver/receiver.go +++ b/receiver/ntpreceiver/receiver.go @@ -30,8 +30,7 @@ func (s *scraper) scrape(context.Context) (pmetric.Metrics, error) { if err != nil { return pmetric.Metrics{}, err } - clockOffset := response.ClockOffset.Seconds() - s.mb.RecordNtpOffsetDataPoint(pcommon.NewTimestampFromTime(time.Now()), clockOffset) + s.mb.RecordNtpOffsetDataPoint(pcommon.NewTimestampFromTime(time.Now()), response.ClockOffset.Nanoseconds()) s.mb.NewResourceBuilder().SetNtpHost(s.endpoint) return s.mb.Emit(), nil }