From 6740a28a8003a534c20c2608d760edd015657622 Mon Sep 17 00:00:00 2001 From: Chao Weng <19381524+sincejune@users.noreply.github.com> Date: Wed, 22 Jan 2025 12:16:41 +0800 Subject: [PATCH] Support logs scraper (#12116) #### Description This PR added support for logs scraper #### Link to tracking issue Relevant to #11238 #### Testing Added #### Documentation Added --- .chloggen/support-logs-scraper.yaml | 25 ++ scraper/factory.go | 34 +- scraper/factory_test.go | 15 +- scraper/scraperhelper/controller.go | 44 +++ scraper/scraperhelper/controller_test.go | 462 +++++++++++++++++++++-- scraper/scraperhelper/obs_logs.go | 16 +- scraper/scraperhelper/obs_logs_test.go | 13 +- 7 files changed, 558 insertions(+), 51 deletions(-) create mode 100644 .chloggen/support-logs-scraper.yaml diff --git a/.chloggen/support-logs-scraper.yaml b/.chloggen/support-logs-scraper.yaml new file mode 100644 index 00000000000..b6dadfebd77 --- /dev/null +++ b/.chloggen/support-logs-scraper.yaml @@ -0,0 +1,25 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: scraper + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Support logs scraper + +# One or more tracking issues or pull requests related to the change +issues: [12116] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [api] diff --git a/scraper/factory.go b/scraper/factory.go index 22418ac22be..e801074d0ae 100644 --- a/scraper/factory.go +++ b/scraper/factory.go @@ -29,12 +29,19 @@ type Settings struct { type Factory interface { component.Factory + // CreateLogs creates a Logs scraper based on this config. + // If the scraper type does not support logs, + // this function returns the error [pipeline.ErrSignalNotSupported]. + CreateLogs(ctx context.Context, set Settings, cfg component.Config) (Logs, error) + // CreateMetrics creates a Metrics scraper based on this config. // If the scraper type does not support metrics, // this function returns the error [pipeline.ErrSignalNotSupported]. - // Implementers can assume `next` is never nil. CreateMetrics(ctx context.Context, set Settings, cfg component.Config) (Metrics, error) + // LogsStability gets the stability level of the Logs scraper. + LogsStability() component.StabilityLevel + // MetricsStability gets the stability level of the Metrics scraper. MetricsStability() component.StabilityLevel @@ -59,7 +66,9 @@ func (f factoryOptionFunc) applyOption(o *factory) { type factory struct { cfgType component.Type component.CreateDefaultConfigFunc + CreateLogsFunc CreateMetricsFunc + logsStabilityLevel component.StabilityLevel metricsStabilityLevel component.StabilityLevel } @@ -69,13 +78,28 @@ func (f *factory) Type() component.Type { func (f *factory) unexportedFactoryFunc() {} +func (f *factory) LogsStability() component.StabilityLevel { + return f.logsStabilityLevel +} + func (f *factory) MetricsStability() component.StabilityLevel { return f.metricsStabilityLevel } +// CreateLogsFunc is the equivalent of Factory.CreateLogs(). +type CreateLogsFunc func(context.Context, Settings, component.Config) (Logs, error) + // CreateMetricsFunc is the equivalent of Factory.CreateMetrics(). type CreateMetricsFunc func(context.Context, Settings, component.Config) (Metrics, error) +// CreateLogs implements Factory.CreateLogs. +func (f CreateLogsFunc) CreateLogs(ctx context.Context, set Settings, cfg component.Config) (Logs, error) { + if f == nil { + return nil, pipeline.ErrSignalNotSupported + } + return f(ctx, set, cfg) +} + // CreateMetrics implements Factory.CreateMetrics. func (f CreateMetricsFunc) CreateMetrics(ctx context.Context, set Settings, cfg component.Config) (Metrics, error) { if f == nil { @@ -84,6 +108,14 @@ func (f CreateMetricsFunc) CreateMetrics(ctx context.Context, set Settings, cfg return f(ctx, set, cfg) } +// WithLogs overrides the default "error not supported" implementation for CreateLogs and the default "undefined" stability level. +func WithLogs(createLogs CreateLogsFunc, sl component.StabilityLevel) FactoryOption { + return factoryOptionFunc(func(o *factory) { + o.logsStabilityLevel = sl + o.CreateLogsFunc = createLogs + }) +} + // WithMetrics overrides the default "error not supported" implementation for CreateMetrics and the default "undefined" stability level. func WithMetrics(createMetrics CreateMetricsFunc, sl component.StabilityLevel) FactoryOption { return factoryOptionFunc(func(o *factory) { diff --git a/scraper/factory_test.go b/scraper/factory_test.go index 8a67ca6f4aa..2d80c04132a 100644 --- a/scraper/factory_test.go +++ b/scraper/factory_test.go @@ -31,7 +31,9 @@ func TestNewFactory(t *testing.T) { func() component.Config { return &defaultCfg }) assert.EqualValues(t, testType, f.Type()) assert.EqualValues(t, &defaultCfg, f.CreateDefaultConfig()) - _, err := f.CreateMetrics(context.Background(), nopSettings(), &defaultCfg) + _, err := f.CreateLogs(context.Background(), nopSettings(), &defaultCfg) + require.ErrorIs(t, err, pipeline.ErrSignalNotSupported) + _, err = f.CreateMetrics(context.Background(), nopSettings(), &defaultCfg) require.ErrorIs(t, err, pipeline.ErrSignalNotSupported) } @@ -41,12 +43,17 @@ func TestNewFactoryWithOptions(t *testing.T) { f := NewFactory( testType, func() component.Config { return &defaultCfg }, + WithLogs(createLogs, component.StabilityLevelAlpha), WithMetrics(createMetrics, component.StabilityLevelAlpha)) assert.EqualValues(t, testType, f.Type()) assert.EqualValues(t, &defaultCfg, f.CreateDefaultConfig()) + assert.Equal(t, component.StabilityLevelAlpha, f.LogsStability()) + _, err := f.CreateLogs(context.Background(), Settings{}, &defaultCfg) + require.NoError(t, err) + assert.Equal(t, component.StabilityLevelAlpha, f.MetricsStability()) - _, err := f.CreateMetrics(context.Background(), Settings{}, &defaultCfg) + _, err = f.CreateMetrics(context.Background(), Settings{}, &defaultCfg) require.NoError(t, err) } @@ -87,6 +94,10 @@ func TestMakeFactoryMap(t *testing.T) { } } +func createLogs(context.Context, Settings, component.Config) (Logs, error) { + return NewLogs(newTestScrapeLogsFunc(nil)) +} + func createMetrics(context.Context, Settings, component.Config) (Metrics, error) { return NewMetrics(newTestScrapeMetricsFunc(nil)) } diff --git a/scraper/scraperhelper/controller.go b/scraper/scraperhelper/controller.go index 8c792cf07b5..257df474ac2 100644 --- a/scraper/scraperhelper/controller.go +++ b/scraper/scraperhelper/controller.go @@ -12,6 +12,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/receiver" "go.opentelemetry.io/collector/receiver/receiverhelper" @@ -179,6 +180,30 @@ func (sc *controller[T]) startScraping() { }() } +// NewLogsController creates a receiver.Logs with the configured options, that can control multiple scraper.Logs. +func NewLogsController(cfg *ControllerConfig, + rSet receiver.Settings, + nextConsumer consumer.Logs, + options ...ControllerOption, +) (receiver.Logs, error) { + co := getOptions(options) + scrapers := make([]scraper.Logs, 0, len(co.factoriesWithConfig)) + for _, fwc := range co.factoriesWithConfig { + set := getSettings(fwc.f.Type(), rSet) + s, err := fwc.f.CreateLogs(context.Background(), set, fwc.cfg) + if err != nil { + return nil, err + } + s, err = wrapObsLogs(s, rSet.ID, set.ID, set.TelemetrySettings) + if err != nil { + return nil, err + } + scrapers = append(scrapers, s) + } + return newController[scraper.Logs]( + cfg, rSet, scrapers, func(c *controller[scraper.Logs]) { scrapeLogs(c, nextConsumer) }, co.tickerCh) +} + // NewMetricsController creates a receiver.Metrics with the configured options, that can control multiple scraper.Metrics. func NewMetricsController(cfg *ControllerConfig, rSet receiver.Settings, @@ -203,6 +228,25 @@ func NewMetricsController(cfg *ControllerConfig, cfg, rSet, scrapers, func(c *controller[scraper.Metrics]) { scrapeMetrics(c, nextConsumer) }, co.tickerCh) } +func scrapeLogs(c *controller[scraper.Logs], nextConsumer consumer.Logs) { + ctx, done := withScrapeContext(c.timeout) + defer done() + + logs := plog.NewLogs() + for i := range c.scrapers { + md, err := c.scrapers[i].ScrapeLogs(ctx) + if err != nil && !scrapererror.IsPartialScrapeError(err) { + continue + } + md.ResourceLogs().MoveAndAppendTo(logs.ResourceLogs()) + } + + logRecordCount := logs.LogRecordCount() + ctx = c.obsrecv.StartMetricsOp(ctx) + err := nextConsumer.ConsumeLogs(ctx, logs) + c.obsrecv.EndMetricsOp(ctx, "", logRecordCount, err) +} + func scrapeMetrics(c *controller[scraper.Metrics], nextConsumer consumer.Metrics) { ctx, done := withScrapeContext(c.timeout) defer done() diff --git a/scraper/scraperhelper/controller_test.go b/scraper/scraperhelper/controller_test.go index 111e82f2d4d..4519f96d2c9 100644 --- a/scraper/scraperhelper/controller_test.go +++ b/scraper/scraperhelper/controller_test.go @@ -16,12 +16,12 @@ import ( "go.opentelemetry.io/otel/sdk/metric/metricdata" "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" sdktrace "go.opentelemetry.io/otel/sdk/trace" - "go.opentelemetry.io/otel/sdk/trace/tracetest" "go.uber.org/multierr" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/consumer/consumertest" + "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/receiver" "go.opentelemetry.io/collector/receiver/receivertest" @@ -52,13 +52,26 @@ func (ts *testClose) shutdown(context.Context) error { return ts.err } -type testScrapeMetrics struct { +type testScrape struct { ch chan int timesScrapeCalled int err error } -func (ts *testScrapeMetrics) scrape(context.Context) (pmetric.Metrics, error) { +func (ts *testScrape) scrapeLogs(context.Context) (plog.Logs, error) { + ts.timesScrapeCalled++ + ts.ch <- ts.timesScrapeCalled + + if ts.err != nil { + return plog.Logs{}, ts.err + } + + md := plog.NewLogs() + md.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords().AppendEmpty().Body().SetStr("") + return md, nil +} + +func (ts *testScrape) scrapeMetrics(context.Context) (pmetric.Metrics, error) { ts.timesScrapeCalled++ ts.ch <- ts.timesScrapeCalled @@ -78,7 +91,7 @@ func newTestNoDelaySettings() *ControllerConfig { } } -type metricsTestCase struct { +type scraperTestCase struct { name string scrapers int @@ -92,8 +105,119 @@ type metricsTestCase struct { closeErr error } -func TestScrapeController(t *testing.T) { - testCases := []metricsTestCase{ +func TestLogsScrapeController(t *testing.T) { + testCases := []scraperTestCase{ + { + name: "NoScrapers", + }, + { + name: "AddLogsScrapersWithCollectionInterval", + scrapers: 2, + expectScraped: true, + }, + { + name: "AddLogsScrapers_ScrapeError", + scrapers: 2, + scrapeErr: errors.New("err1"), + }, + { + name: "AddLogsScrapersWithInitializeAndClose", + scrapers: 2, + initialize: true, + expectScraped: true, + close: true, + }, + { + name: "AddLogsScrapersWithInitializeAndCloseErrors", + scrapers: 2, + initialize: true, + close: true, + initializeErr: errors.New("err1"), + closeErr: errors.New("err2"), + }, + } + + for _, test := range testCases { + t.Run(test.name, func(t *testing.T) { + receiverID := component.MustNewID("receiver") + tt := metadatatest.SetupTelemetry() + tel := tt.NewTelemetrySettings() + + _, parentSpan := tel.TracerProvider.Tracer("test").Start(context.Background(), t.Name()) + defer parentSpan.End() + t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) + + initializeChs := make([]chan bool, test.scrapers) + scrapeLogsChs := make([]chan int, test.scrapers) + closeChs := make([]chan bool, test.scrapers) + options := configureLogOptions(t, test, initializeChs, scrapeLogsChs, closeChs) + + tickerCh := make(chan time.Time) + options = append(options, WithTickerChannel(tickerCh)) + + sink := new(consumertest.LogsSink) + cfg := newTestNoDelaySettings() + if test.scraperControllerSettings != nil { + cfg = test.scraperControllerSettings + } + + mr, err := NewLogsController(cfg, receiver.Settings{ID: receiverID, TelemetrySettings: tel, BuildInfo: component.NewDefaultBuildInfo()}, sink, options...) + require.NoError(t, err) + + err = mr.Start(context.Background(), componenttest.NewNopHost()) + expectedStartErr := getExpectedStartErr(test) + if expectedStartErr != nil { + assert.Equal(t, expectedStartErr, err) + } else if test.initialize { + assertChannelsCalled(t, initializeChs, "start was not called") + } + + const iterations = 5 + + if test.expectScraped || test.scrapeErr != nil { + // validate that scrape is called at least N times for each configured scraper + for _, ch := range scrapeLogsChs { + <-ch + } + // Consume the initial scrapes on start + for i := 0; i < iterations; i++ { + tickerCh <- time.Now() + + for _, ch := range scrapeLogsChs { + <-ch + } + } + + // wait until all calls to scrape have completed + if test.scrapeErr == nil { + require.Eventually(t, func() bool { + return sink.LogRecordCount() == (1+iterations)*(test.scrapers) + }, time.Second, time.Millisecond) + } + + if test.expectScraped { + assert.GreaterOrEqual(t, sink.LogRecordCount(), iterations) + } + + spans := tt.SpanRecorder.Ended() + assertReceiverSpan(t, spans) + assertScraperSpan(t, test.scrapeErr, spans, "scraper/scraper/ScrapeLogs") + assertLogsScraperObsMetrics(t, tt, receiverID, component.MustNewID("scraper"), test.scrapeErr, sink) + } + + err = mr.Shutdown(context.Background()) + expectedShutdownErr := getExpectedShutdownErr(test) + if expectedShutdownErr != nil { + assert.EqualError(t, err, expectedShutdownErr.Error()) + } else if test.close { + assertChannelsCalled(t, closeChs, "shutdown was not called") + } + }) + } +} + +func TestMetricsScrapeController(t *testing.T) { + testCases := []scraperTestCase{ { name: "NoScrapers", }, @@ -128,11 +252,7 @@ func TestScrapeController(t *testing.T) { t.Run(test.name, func(t *testing.T) { receiverID := component.MustNewID("receiver") tt := metadatatest.SetupTelemetry() - tel := tt.NewTelemetrySettings() - // TODO: Add capability for tracing testing in metadatatest. - spanRecorder := new(tracetest.SpanRecorder) - tel.TracerProvider = sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(spanRecorder)) _, parentSpan := tel.TracerProvider.Tracer("test").Start(context.Background(), t.Name()) defer parentSpan.End() @@ -190,10 +310,10 @@ func TestScrapeController(t *testing.T) { assert.GreaterOrEqual(t, sink.DataPointCount(), iterations) } - spans := spanRecorder.Ended() + spans := tt.SpanRecorder.Ended() assertReceiverSpan(t, spans) - assertScraperSpan(t, test.scrapeErr, spans) - assertMetrics(t, tt, receiverID, component.MustNewID("scraper"), test.scrapeErr, sink) + assertScraperSpan(t, test.scrapeErr, spans, "scraper/scraper/ScrapeMetrics") + assertMetricsScraperObsMetrics(t, tt, receiverID, component.MustNewID("scraper"), test.scrapeErr, sink) } err = mr.Shutdown(context.Background()) @@ -207,7 +327,34 @@ func TestScrapeController(t *testing.T) { } } -func configureMetricOptions(t *testing.T, test metricsTestCase, initializeChs []chan bool, scrapeMetricsChs []chan int, closeChs []chan bool) []ControllerOption { +func configureLogOptions(t *testing.T, test scraperTestCase, initializeChs []chan bool, scrapeLogsChs []chan int, closeChs []chan bool) []ControllerOption { + var logsOptions []ControllerOption + + for i := 0; i < test.scrapers; i++ { + var scraperOptions []scraper.Option + if test.initialize { + initializeChs[i] = make(chan bool, 1) + ti := &testInitialize{ch: initializeChs[i], err: test.initializeErr} + scraperOptions = append(scraperOptions, scraper.WithStart(ti.start)) + } + if test.close { + closeChs[i] = make(chan bool, 1) + tc := &testClose{ch: closeChs[i], err: test.closeErr} + scraperOptions = append(scraperOptions, scraper.WithShutdown(tc.shutdown)) + } + + scrapeLogsChs[i] = make(chan int) + ts := &testScrape{ch: scrapeLogsChs[i], err: test.scrapeErr} + scp, err := scraper.NewLogs(ts.scrapeLogs, scraperOptions...) + require.NoError(t, err) + + logsOptions = append(logsOptions, addLogsScraper(component.MustNewType("scraper"), scp)) + } + + return logsOptions +} + +func configureMetricOptions(t *testing.T, test scraperTestCase, initializeChs []chan bool, scrapeMetricsChs []chan int, closeChs []chan bool) []ControllerOption { var metricOptions []ControllerOption for i := 0; i < test.scrapers; i++ { @@ -224,8 +371,8 @@ func configureMetricOptions(t *testing.T, test metricsTestCase, initializeChs [] } scrapeMetricsChs[i] = make(chan int) - tsm := &testScrapeMetrics{ch: scrapeMetricsChs[i], err: test.scrapeErr} - scp, err := scraper.NewMetrics(tsm.scrape, scraperOptions...) + ts := &testScrape{ch: scrapeMetricsChs[i], err: test.scrapeErr} + scp, err := scraper.NewMetrics(ts.scrapeMetrics, scraperOptions...) require.NoError(t, err) metricOptions = append(metricOptions, AddScraper(component.MustNewType("scraper"), scp)) @@ -234,11 +381,11 @@ func configureMetricOptions(t *testing.T, test metricsTestCase, initializeChs [] return metricOptions } -func getExpectedStartErr(test metricsTestCase) error { +func getExpectedStartErr(test scraperTestCase) error { return test.initializeErr } -func getExpectedShutdownErr(test metricsTestCase) error { +func getExpectedShutdownErr(test scraperTestCase) error { var errs error if test.closeErr != nil { @@ -275,7 +422,7 @@ func assertReceiverSpan(t *testing.T, spans []sdktrace.ReadOnlySpan) { assert.True(t, receiverSpan) } -func assertScraperSpan(t *testing.T, expectedErr error, spans []sdktrace.ReadOnlySpan) { +func assertScraperSpan(t *testing.T, expectedErr error, spans []sdktrace.ReadOnlySpan, expectedSpanName string) { expectedStatusCode := codes.Unset expectedStatusMessage := "" if expectedErr != nil { @@ -285,7 +432,7 @@ func assertScraperSpan(t *testing.T, expectedErr error, spans []sdktrace.ReadOnl scraperSpan := false for _, span := range spans { - if span.Name() == "scraper/scraper/ScrapeMetrics" { + if span.Name() == expectedSpanName { scraperSpan = true assert.Equal(t, expectedStatusCode, span.Status().Code) assert.Equal(t, expectedStatusMessage, span.Status().Description) @@ -295,7 +442,97 @@ func assertScraperSpan(t *testing.T, expectedErr error, spans []sdktrace.ReadOnl assert.True(t, scraperSpan) } -func assertMetrics(t *testing.T, tt metadatatest.Telemetry, receiver component.ID, scraper component.ID, expectedErr error, sink *consumertest.MetricsSink) { +func assertLogsScraperObsMetrics(t *testing.T, tt metadatatest.Telemetry, receiver component.ID, scraper component.ID, expectedErr error, sink *consumertest.LogsSink) { + logRecordCounts := 0 + for _, md := range sink.AllLogs() { + logRecordCounts += md.LogRecordCount() + } + + expectedScraped := int64(sink.LogRecordCount()) + expectedErrored := int64(0) + if expectedErr != nil { + var partialError scrapererror.PartialScrapeError + if errors.As(expectedErr, &partialError) { + expectedErrored = int64(partialError.Failed) + } else { + expectedScraped = int64(0) + expectedErrored = int64(sink.LogRecordCount()) + } + } + + tt.AssertMetrics(t, []metricdata.Metrics{ + { + Name: "otelcol_receiver_accepted_metric_points", + Description: "Number of metric points successfully pushed into the pipeline. [alpha]", + Unit: "{datapoints}", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet( + attribute.String(receiverKey, receiver.String()), + attribute.String(transportTag, "")), + Value: int64(logRecordCounts), + }, + }, + }, + }, + { + Name: "otelcol_receiver_refused_metric_points", + Description: "Number of metric points that could not be pushed into the pipeline. [alpha]", + Unit: "{datapoints}", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet( + attribute.String(receiverKey, receiver.String()), + attribute.String(transportTag, "")), + Value: 0, + }, + }, + }, + }, + { + Name: "otelcol_scraper_scraped_log_records", + Description: "Number of log records successfully scraped. [alpha]", + Unit: "{datapoints}", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet( + attribute.String(receiverKey, receiver.String()), + attribute.String(scraperKey, scraper.String())), + Value: expectedScraped, + }, + }, + }, + }, + { + Name: "otelcol_scraper_errored_log_records", + Description: "Number of log records that were unable to be scraped. [alpha]", + Unit: "{datapoints}", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet( + attribute.String(receiverKey, receiver.String()), + attribute.String(scraperKey, scraper.String())), + Value: expectedErrored, + }, + }, + }, + }, + }, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) +} + +func assertMetricsScraperObsMetrics(t *testing.T, tt metadatatest.Telemetry, receiver component.ID, scraper component.ID, expectedErr error, sink *consumertest.MetricsSink) { dataPointCounts := 0 for _, md := range sink.AllMetrics() { dataPointCounts += md.DataPointCount() @@ -385,22 +622,65 @@ func assertMetrics(t *testing.T, tt metadatatest.Telemetry, receiver component.I }, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) } -func TestSingleScrapePerInterval(t *testing.T) { - scrapeMetricsCh := make(chan int, 10) - tsm := &testScrapeMetrics{ch: scrapeMetricsCh} +func TestSingleLogsScraperPerInterval(t *testing.T) { + scrapeCh := make(chan int, 10) + ts := &testScrape{ch: scrapeCh} + + cfg := newTestNoDelaySettings() + + tickerCh := make(chan time.Time) + + scp, err := scraper.NewLogs(ts.scrapeLogs) + require.NoError(t, err) + + recv, err := NewLogsController( + cfg, + receivertest.NewNopSettings(), + new(consumertest.LogsSink), + addLogsScraper(component.MustNewType("scraper"), scp), + WithTickerChannel(tickerCh), + ) + require.NoError(t, err) + + require.NoError(t, recv.Start(context.Background(), componenttest.NewNopHost())) + defer func() { require.NoError(t, recv.Shutdown(context.Background())) }() + + tickerCh <- time.Now() + + assert.Eventually( + t, + func() bool { + return <-scrapeCh == 2 + }, + 300*time.Millisecond, + 100*time.Millisecond, + "Make sure the scraper channel is called twice", + ) + + select { + case <-scrapeCh: + assert.Fail(t, "Scrape was called more than twice") + case <-time.After(100 * time.Millisecond): + return + } +} + +func TestSingleMetricsScraperPerInterval(t *testing.T) { + scrapeCh := make(chan int, 10) + ts := &testScrape{ch: scrapeCh} cfg := newTestNoDelaySettings() tickerCh := make(chan time.Time) - scp, err := scraper.NewMetrics(tsm.scrape) + scp, err := scraper.NewMetrics(ts.scrapeMetrics) require.NoError(t, err) recv, err := NewMetricsController( cfg, receivertest.NewNopSettings(), new(consumertest.MetricsSink), - AddScraper(component.MustNewType("scaper"), scp), + AddScraper(component.MustNewType("scraper"), scp), WithTickerChannel(tickerCh), ) require.NoError(t, err) @@ -413,7 +693,7 @@ func TestSingleScrapePerInterval(t *testing.T) { assert.Eventually( t, func() bool { - return <-scrapeMetricsCh == 2 + return <-scrapeCh == 2 }, 300*time.Millisecond, 100*time.Millisecond, @@ -421,21 +701,48 @@ func TestSingleScrapePerInterval(t *testing.T) { ) select { - case <-scrapeMetricsCh: + case <-scrapeCh: assert.Fail(t, "Scrape was called more than twice") case <-time.After(100 * time.Millisecond): return } } -func TestScrapeControllerStartsOnInit(t *testing.T) { +func TestLogsScraperControllerStartsOnInit(t *testing.T) { + t.Parallel() + + ts := &testScrape{ + ch: make(chan int, 1), + } + + scp, err := scraper.NewLogs(ts.scrapeLogs) + require.NoError(t, err, "Must not error when creating scraper") + + r, err := NewLogsController( + &ControllerConfig{ + CollectionInterval: time.Hour, + InitialDelay: 0, + }, + receivertest.NewNopSettings(), + new(consumertest.LogsSink), + addLogsScraper(component.MustNewType("scraper"), scp), + ) + require.NoError(t, err, "Must not error when creating scrape controller") + + assert.NoError(t, r.Start(context.Background(), componenttest.NewNopHost()), "Must not error on start") + <-time.After(500 * time.Nanosecond) + require.NoError(t, r.Shutdown(context.Background()), "Must not have errored on shutdown") + assert.Equal(t, 1, ts.timesScrapeCalled, "Must have been called as soon as the controller started") +} + +func TestMetricsScraperControllerStartsOnInit(t *testing.T) { t.Parallel() - tsm := &testScrapeMetrics{ + ts := &testScrape{ ch: make(chan int, 1), } - scp, err := scraper.NewMetrics(tsm.scrape) + scp, err := scraper.NewMetrics(ts.scrapeMetrics) require.NoError(t, err, "Must not error when creating scraper") r, err := NewMetricsController( @@ -445,17 +752,56 @@ func TestScrapeControllerStartsOnInit(t *testing.T) { }, receivertest.NewNopSettings(), new(consumertest.MetricsSink), - AddScraper(component.MustNewType("scaper"), scp), + AddScraper(component.MustNewType("scraper"), scp), ) require.NoError(t, err, "Must not error when creating scrape controller") assert.NoError(t, r.Start(context.Background(), componenttest.NewNopHost()), "Must not error on start") <-time.After(500 * time.Nanosecond) require.NoError(t, r.Shutdown(context.Background()), "Must not have errored on shutdown") - assert.Equal(t, 1, tsm.timesScrapeCalled, "Must have been called as soon as the controller started") + assert.Equal(t, 1, ts.timesScrapeCalled, "Must have been called as soon as the controller started") +} + +func TestLogsScraperControllerInitialDelay(t *testing.T) { + if testing.Short() { + t.Skip("This requires real time to pass, skipping") + return + } + + t.Parallel() + + var ( + elapsed = make(chan time.Time, 1) + cfg = ControllerConfig{ + CollectionInterval: time.Second, + InitialDelay: 300 * time.Millisecond, + } + ) + + scp, err := scraper.NewLogs(func(context.Context) (plog.Logs, error) { + elapsed <- time.Now() + return plog.NewLogs(), nil + }) + require.NoError(t, err, "Must not error when creating scraper") + + r, err := NewLogsController( + &cfg, + receivertest.NewNopSettings(), + new(consumertest.LogsSink), + addLogsScraper(component.MustNewType("scraper"), scp), + ) + require.NoError(t, err, "Must not error when creating receiver") + + t0 := time.Now() + require.NoError(t, r.Start(context.Background(), componenttest.NewNopHost()), "Must not error when starting") + t1 := <-elapsed + + assert.GreaterOrEqual(t, t1.Sub(t0), 300*time.Millisecond, "Must have had 300ms pass as defined by initial delay") + + assert.NoError(t, r.Shutdown(context.Background()), "Must not error closing down") } -func TestScrapeControllerInitialDelay(t *testing.T) { +func TestMetricsScraperControllerInitialDelay(t *testing.T) { if testing.Short() { t.Skip("This requires real time to pass, skipping") return @@ -481,7 +827,7 @@ func TestScrapeControllerInitialDelay(t *testing.T) { &cfg, receivertest.NewNopSettings(), new(consumertest.MetricsSink), - AddScraper(component.MustNewType("scaper"), scp), + AddScraper(component.MustNewType("scraper"), scp), ) require.NoError(t, err, "Must not error when creating receiver") @@ -494,7 +840,41 @@ func TestScrapeControllerInitialDelay(t *testing.T) { assert.NoError(t, r.Shutdown(context.Background()), "Must not error closing down") } -func TestShutdownBeforeScrapeCanStart(t *testing.T) { +func TestLogsScraperShutdownBeforeScrapeCanStart(t *testing.T) { + cfg := ControllerConfig{ + CollectionInterval: time.Second, + InitialDelay: 5 * time.Second, + } + + scp, err := scraper.NewLogs(func(context.Context) (plog.Logs, error) { + // make the scraper wait for long enough it would disrupt a shutdown. + time.Sleep(30 * time.Second) + return plog.NewLogs(), nil + }) + require.NoError(t, err, "Must not error when creating scraper") + + r, err := NewLogsController( + &cfg, + receivertest.NewNopSettings(), + new(consumertest.LogsSink), + addLogsScraper(component.MustNewType("scraper"), scp), + ) + require.NoError(t, err, "Must not error when creating receiver") + require.NoError(t, r.Start(context.Background(), componenttest.NewNopHost())) + shutdown := make(chan struct{}, 1) + go func() { + assert.NoError(t, r.Shutdown(context.Background())) + close(shutdown) + }() + timer := time.NewTicker(10 * time.Second) + select { + case <-timer.C: + require.Fail(t, "shutdown should not wait for scraping") + case <-shutdown: + } +} + +func TestMetricsScraperShutdownBeforeScrapeCanStart(t *testing.T) { cfg := ControllerConfig{ CollectionInterval: time.Second, InitialDelay: 5 * time.Second, @@ -511,7 +891,7 @@ func TestShutdownBeforeScrapeCanStart(t *testing.T) { &cfg, receivertest.NewNopSettings(), new(consumertest.MetricsSink), - AddScraper(component.MustNewType("scaper"), scp), + AddScraper(component.MustNewType("scraper"), scp), ) require.NoError(t, err, "Must not error when creating receiver") require.NoError(t, r.Start(context.Background(), componenttest.NewNopHost())) @@ -527,3 +907,11 @@ func TestShutdownBeforeScrapeCanStart(t *testing.T) { case <-shutdown: } } + +func addLogsScraper(t component.Type, sc scraper.Logs) ControllerOption { + f := scraper.NewFactory(t, nil, + scraper.WithLogs(func(context.Context, scraper.Settings, component.Config) (scraper.Logs, error) { + return sc, nil + }, component.StabilityLevelAlpha)) + return AddFactoryWithConfig(f, nil) +} diff --git a/scraper/scraperhelper/obs_logs.go b/scraper/scraperhelper/obs_logs.go index b05a72e76f6..88fd252a969 100644 --- a/scraper/scraperhelper/obs_logs.go +++ b/scraper/scraperhelper/obs_logs.go @@ -29,28 +29,28 @@ const ( erroredLogRecordsKey = "errored_log_records" ) -func newObsLogs(delegate scraper.ScrapeLogsFunc, receiverID component.ID, scraperID component.ID, telSettings component.TelemetrySettings) (scraper.ScrapeLogsFunc, error) { - telemetryBuilder, errBuilder := metadata.NewTelemetryBuilder(telSettings) +func wrapObsLogs(sc scraper.Logs, receiverID component.ID, scraperID component.ID, set component.TelemetrySettings) (scraper.Logs, error) { + telemetryBuilder, errBuilder := metadata.NewTelemetryBuilder(set) if errBuilder != nil { return nil, errBuilder } - tracer := metadata.Tracer(telSettings) + tracer := metadata.Tracer(set) spanName := scraperKey + spanNameSep + scraperID.String() + spanNameSep + "ScrapeLogs" otelAttrs := metric.WithAttributeSet(attribute.NewSet( attribute.String(receiverKey, receiverID.String()), attribute.String(scraperKey, scraperID.String()), )) - return func(ctx context.Context) (plog.Logs, error) { + scraperFuncs := func(ctx context.Context) (plog.Logs, error) { ctx, span := tracer.Start(ctx, spanName) defer span.End() - md, err := delegate(ctx) + md, err := sc.ScrapeLogs(ctx) numScrapedLogs := 0 numErroredLogs := 0 if err != nil { - telSettings.Logger.Error("Error scraping logs", zap.Error(err)) + set.Logger.Error("Error scraping logs", zap.Error(err)) var partialErr scrapererror.PartialScrapeError if errors.As(err, &partialErr) { numErroredLogs = partialErr.Failed @@ -77,5 +77,7 @@ func newObsLogs(delegate scraper.ScrapeLogsFunc, receiverID component.ID, scrape } return md, err - }, nil + } + + return scraper.NewLogs(scraperFuncs, scraper.WithStart(sc.Start), scraper.WithShutdown(sc.Shutdown)) } diff --git a/scraper/scraperhelper/obs_logs_test.go b/scraper/scraperhelper/obs_logs_test.go index 9c0d3a6c71b..4b689db8a85 100644 --- a/scraper/scraperhelper/obs_logs_test.go +++ b/scraper/scraperhelper/obs_logs_test.go @@ -18,6 +18,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/testdata" + "go.opentelemetry.io/collector/scraper" "go.opentelemetry.io/collector/scraper/scraperhelper/internal/metadatatest" ) @@ -36,9 +37,11 @@ func TestScrapeLogsDataOp(t *testing.T) { {items: 15, err: nil}, } for i := range params { - sf, err := newObsLogs(func(context.Context) (plog.Logs, error) { + sm, err := scraper.NewLogs(func(context.Context) (plog.Logs, error) { return testdata.GenerateLogs(params[i].items), params[i].err - }, receiverID, scraperID, tel) + }) + require.NoError(t, err) + sf, err := wrapObsLogs(sm, receiverID, scraperID, tel) require.NoError(t, err) _, err = sf.ScrapeLogs(parentCtx) require.ErrorIs(t, err, params[i].err) @@ -81,9 +84,11 @@ func TestCheckScraperLogs(t *testing.T) { tt := metadatatest.SetupTelemetry() t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) - sf, err := newObsLogs(func(context.Context) (plog.Logs, error) { + sm, err := scraper.NewLogs(func(context.Context) (plog.Logs, error) { return testdata.GenerateLogs(7), nil - }, receiverID, scraperID, tt.NewTelemetrySettings()) + }) + require.NoError(t, err) + sf, err := wrapObsLogs(sm, receiverID, scraperID, tt.NewTelemetrySettings()) require.NoError(t, err) _, err = sf.ScrapeLogs(context.Background()) require.NoError(t, err)