diff --git a/comp/forwarder/eventplatform/eventplatformimpl/epforwarder.go b/comp/forwarder/eventplatform/eventplatformimpl/epforwarder.go index 4e673dc938fbc..ddbd749ddddcf 100644 --- a/comp/forwarder/eventplatform/eventplatformimpl/epforwarder.go +++ b/comp/forwarder/eventplatform/eventplatformimpl/epforwarder.go @@ -415,19 +415,25 @@ func newHTTPPassthroughPipeline( pipelineMonitor := metrics.NewNoopPipelineMonitor(strconv.Itoa(pipelineID)) - reliable := []client.Destination{} - for i, endpoint := range endpoints.GetReliableEndpoints() { - destMeta := client.NewDestinationMetadata(desc.eventType, pipelineMonitor.ID(), "reliable", strconv.Itoa(i)) - reliable = append(reliable, logshttp.NewDestination(endpoint, desc.contentType, destinationsContext, endpoints.BatchMaxConcurrentSend, true, destMeta, pkgconfigsetup.Datadog(), pipelineMonitor)) - } - additionals := []client.Destination{} - for i, endpoint := range endpoints.GetUnReliableEndpoints() { - destMeta := client.NewDestinationMetadata(desc.eventType, pipelineMonitor.ID(), "unreliable", strconv.Itoa(i)) - additionals = append(additionals, logshttp.NewDestination(endpoint, desc.contentType, destinationsContext, endpoints.BatchMaxConcurrentSend, false, destMeta, pkgconfigsetup.Datadog(), pipelineMonitor)) - } - destinations := client.NewDestinations(reliable, additionals) inputChan := make(chan *message.Message, endpoints.InputChanSize) - senderInput := make(chan *message.Payload, 1) // Only buffer 1 message since payloads can be large + + a := auditor.NewNullAuditor() + senderImpl := sender.NewSender( + coreConfig, + a, + 10, // Buffer Size + nil, // senderDoneChan, required only for serverless + nil, // flushWg, required only for serverless + endpoints, + destinationsContext, + nil, // status, required only for tcp endpoint support + false, + desc.eventType, + desc.contentType, + sender.DefaultWorkerCount, + endpoints.BatchMaxConcurrentSend, + endpoints.BatchMaxConcurrentSend, + ) var encoder compressioncommon.Compressor encoder = compressor.NewCompressor("none", 0) @@ -437,10 +443,10 @@ func newHTTPPassthroughPipeline( var strategy sender.Strategy if desc.contentType == logshttp.ProtobufContentType { - strategy = sender.NewStreamStrategy(inputChan, senderInput, encoder) + strategy = sender.NewStreamStrategy(inputChan, senderImpl.In(), encoder) } else { strategy = sender.NewBatchStrategy(inputChan, - senderInput, + senderImpl.In(), make(chan struct{}), false, nil, @@ -453,11 +459,10 @@ func newHTTPPassthroughPipeline( pipelineMonitor) } - a := auditor.NewNullAuditor() log.Debugf("Initialized event platform forwarder pipeline. eventType=%s mainHosts=%s additionalHosts=%s batch_max_concurrent_send=%d batch_max_content_size=%d batch_max_size=%d, input_chan_size=%d", desc.eventType, joinHosts(endpoints.GetReliableEndpoints()), joinHosts(endpoints.GetUnReliableEndpoints()), endpoints.BatchMaxConcurrentSend, endpoints.BatchMaxContentSize, endpoints.BatchMaxSize, endpoints.InputChanSize) return &passthroughPipeline{ - sender: sender.NewSender(coreConfig, senderInput, a.Channel(), destinations, 10, nil, nil, pipelineMonitor), + sender: senderImpl, strategy: strategy, in: inputChan, auditor: a, diff --git a/comp/logs/agent/agentimpl/agent_core_init.go b/comp/logs/agent/agentimpl/agent_core_init.go index 87e2b00810b24..9227b92a35bb2 100644 --- a/comp/logs/agent/agentimpl/agent_core_init.go +++ b/comp/logs/agent/agentimpl/agent_core_init.go @@ -46,7 +46,9 @@ func (a *logAgent) SetupPipeline(processingRules []*config.ProcessingRule, wmeta diagnosticMessageReceiver := diagnostic.NewBufferedMessageReceiver(nil, a.hostname) // setup the pipeline provider that provides pairs of processor and sender - pipelineProvider := pipeline.NewProvider(a.config.GetInt("logs_config.pipelines"), auditor, diagnosticMessageReceiver, processingRules, a.endpoints, destinationsCtx, NewStatusProvider(), a.hostname, a.config, a.compression) + pipelineProvider := pipeline.NewProvider(a.config.GetInt("logs_config.pipelines"), + auditor, diagnosticMessageReceiver, processingRules, a.endpoints, destinationsCtx, + NewStatusProvider(), a.hostname, a.config, a.compression) // setup the launchers lnchrs := launchers.NewLaunchers(a.sources, pipelineProvider, auditor, a.tracker) @@ -78,7 +80,6 @@ func (a *logAgent) SetupPipeline(processingRules []*config.ProcessingRule, wmeta a.launchers = lnchrs a.health = health a.diagnosticMessageReceiver = diagnosticMessageReceiver - } // buildEndpoints builds endpoints for the logs agent diff --git a/comp/logs/agent/agentimpl/agent_serverless_init.go b/comp/logs/agent/agentimpl/agent_serverless_init.go index dabe8ab060135..ea784272f465d 100644 --- a/comp/logs/agent/agentimpl/agent_serverless_init.go +++ b/comp/logs/agent/agentimpl/agent_serverless_init.go @@ -49,7 +49,19 @@ func (a *logAgent) SetupPipeline( destinationsCtx := client.NewDestinationsContext() // setup the pipeline provider that provides pairs of processor and sender - pipelineProvider := pipeline.NewServerlessProvider(a.config.GetInt("logs_config.pipelines"), a.auditor, diagnosticMessageReceiver, processingRules, a.endpoints, destinationsCtx, NewStatusProvider(), a.hostname, a.config, a.compression) + pipelineProvider := pipeline.NewLegacyProvider( + a.config.GetInt("logs_config.pipelines"), + a.auditor, + diagnosticMessageReceiver, + processingRules, + a.endpoints, + destinationsCtx, + NewStatusProvider(), + a.hostname, + a.config, + a.compression, + true, + ) lnchrs := launchers.NewLaunchers(a.sources, pipelineProvider, a.auditor, a.tracker) lnchrs.AddLauncher(channel.NewLauncher()) diff --git a/comp/logs/agent/agentimpl/agent_test.go b/comp/logs/agent/agentimpl/agent_test.go index 5a139971fbad6..d5d26a95469a4 100644 --- a/comp/logs/agent/agentimpl/agent_test.go +++ b/comp/logs/agent/agentimpl/agent_test.go @@ -10,6 +10,7 @@ package agentimpl import ( "bytes" "context" + "expvar" "fmt" "os" "strings" @@ -158,7 +159,9 @@ func (suite *AgentTestSuite) testAgent(endpoints *config.Endpoints) { assert.Equal(suite.T(), zero, metrics.LogsProcessed.Value()) assert.Equal(suite.T(), zero, metrics.LogsSent.Value()) assert.Equal(suite.T(), zero, metrics.DestinationErrors.Value()) - assert.Equal(suite.T(), "{}", metrics.DestinationLogsDropped.String()) + metrics.DestinationLogsDropped.Do(func(k expvar.KeyValue) { + assert.Equal(suite.T(), k.Value.String(), "0") + }) agent.startPipeline() sources.AddSource(suite.source) diff --git a/comp/logs/agent/config/config_test.go b/comp/logs/agent/config/config_test.go index 0d3c8df9642dc..b3fa78a5cb29e 100644 --- a/comp/logs/agent/config/config_test.go +++ b/comp/logs/agent/config/config_test.go @@ -10,11 +10,12 @@ import ( "testing" "time" - "github.com/DataDog/datadog-agent/comp/core/config" - pkgconfigsetup "github.com/DataDog/datadog-agent/pkg/config/setup" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/suite" "go.uber.org/atomic" + + "github.com/DataDog/datadog-agent/comp/core/config" + pkgconfigsetup "github.com/DataDog/datadog-agent/pkg/config/setup" ) type ConfigTestSuite struct { diff --git a/comp/otelcol/logsagentpipeline/logsagentpipelineimpl/agent.go b/comp/otelcol/logsagentpipeline/logsagentpipelineimpl/agent.go index 9da32f7bd6739..4f542758ab9e8 100644 --- a/comp/otelcol/logsagentpipeline/logsagentpipelineimpl/agent.go +++ b/comp/otelcol/logsagentpipeline/logsagentpipelineimpl/agent.go @@ -214,7 +214,9 @@ func (a *Agent) SetupPipeline( destinationsCtx := client.NewDestinationsContext() // setup the pipeline provider that provides pairs of processor and sender - pipelineProvider := pipeline.NewProvider(a.config.GetInt("logs_config.pipelines"), auditor, &diagnostic.NoopMessageReceiver{}, processingRules, a.endpoints, destinationsCtx, NewStatusProvider(), a.hostname, a.config, a.compression) + pipelineProvider := pipeline.NewProvider(a.config.GetInt("logs_config.pipelines"), auditor, + &diagnostic.NoopMessageReceiver{}, processingRules, a.endpoints, + destinationsCtx, NewStatusProvider(), a.hostname, a.config, a.compression) a.auditor = auditor a.destinationsCtx = destinationsCtx diff --git a/comp/otelcol/logsagentpipeline/logsagentpipelineimpl/agent_test.go b/comp/otelcol/logsagentpipeline/logsagentpipelineimpl/agent_test.go index 50d225033a4b3..a2b1fd99334e5 100644 --- a/comp/otelcol/logsagentpipeline/logsagentpipelineimpl/agent_test.go +++ b/comp/otelcol/logsagentpipeline/logsagentpipelineimpl/agent_test.go @@ -7,6 +7,7 @@ package logsagentpipelineimpl import ( "context" + "expvar" "testing" "time" @@ -97,7 +98,9 @@ func (suite *AgentTestSuite) testAgent(endpoints *config.Endpoints) { assert.Equal(suite.T(), zero, metrics.LogsProcessed.Value()) assert.Equal(suite.T(), zero, metrics.LogsSent.Value()) assert.Equal(suite.T(), zero, metrics.DestinationErrors.Value()) - assert.Equal(suite.T(), "{}", metrics.DestinationLogsDropped.String()) + metrics.DestinationLogsDropped.Do(func(k expvar.KeyValue) { + assert.Equal(suite.T(), k.Value.String(), "0") + }) agent.startPipeline() suite.sendTestMessages(agent) diff --git a/comp/otelcol/otlp/collector_test.go b/comp/otelcol/otlp/collector_test.go index 6134d8a6a9a47..dd8cc861f12fc 100644 --- a/comp/otelcol/otlp/collector_test.go +++ b/comp/otelcol/otlp/collector_test.go @@ -33,7 +33,6 @@ func TestGetComponents(t *testing.T) { func AssertSucessfulRun(t *testing.T, pcfg PipelineConfig) { fakeTagger := mock.SetupFakeTagger(t) - p, err := NewPipeline(pcfg, serializermock.NewMetricSerializer(t), make(chan *message.Message), fakeTagger) require.NoError(t, err) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) diff --git a/pkg/compliance/reporter.go b/pkg/compliance/reporter.go index 2561788c6aaa4..202ebac4bcd90 100644 --- a/pkg/compliance/reporter.go +++ b/pkg/compliance/reporter.go @@ -44,7 +44,19 @@ func NewLogReporter(hostname string, sourceName, sourceType string, endpoints *c auditor.Start() // setup the pipeline provider that provides pairs of processor and sender - pipelineProvider := pipeline.NewProvider(4, auditor, &diagnostic.NoopMessageReceiver{}, nil, endpoints, dstcontext, &common.NoopStatusProvider{}, hostnameimpl.NewHostnameService(), pkgconfigsetup.Datadog(), compression) + pipelineProvider := pipeline.NewLegacyProvider( + 4, + auditor, + &diagnostic.NoopMessageReceiver{}, + nil, + endpoints, + dstcontext, + &common.NoopStatusProvider{}, + hostnameimpl.NewHostnameService(), + pkgconfigsetup.Datadog(), + compression, + false, + ) pipelineProvider.Start() logSource := sources.NewLogSource( diff --git a/pkg/logs/client/http/destination.go b/pkg/logs/client/http/destination.go index 71af3e9ae5c9c..289e58e72c0aa 100644 --- a/pkg/logs/client/http/destination.go +++ b/pkg/logs/client/http/destination.go @@ -56,6 +56,11 @@ var ( //nolint:revive // TODO(AML) Fix revive linter var emptyJsonPayload = message.Payload{Messages: []*message.Message{}, Encoded: []byte("{}")} +type destinationResult struct { + latency time.Duration + err error +} + // Destination sends a payload over HTTP. type Destination struct { // Config @@ -70,8 +75,8 @@ type Destination struct { isMRF bool // Concurrency - climit chan struct{} // semaphore for limiting concurrent background sends - wg sync.WaitGroup + senderPool SenderPool + wg sync.WaitGroup // Retry backoff backoff.Policy @@ -94,20 +99,22 @@ type Destination struct { func NewDestination(endpoint config.Endpoint, contentType string, destinationsContext *client.DestinationsContext, - maxConcurrentBackgroundSends int, shouldRetry bool, destMeta *client.DestinationMetadata, cfg pkgconfigmodel.Reader, + minConcurrency int, + maxConcurrency int, pipelineMonitor metrics.PipelineMonitor) *Destination { return newDestination(endpoint, contentType, destinationsContext, time.Second*10, - maxConcurrentBackgroundSends, shouldRetry, destMeta, cfg, + minConcurrency, + maxConcurrency, pipelineMonitor) } @@ -115,15 +122,13 @@ func newDestination(endpoint config.Endpoint, contentType string, destinationsContext *client.DestinationsContext, timeout time.Duration, - maxConcurrentBackgroundSends int, shouldRetry bool, destMeta *client.DestinationMetadata, cfg pkgconfigmodel.Reader, + minConcurrency int, + maxConcurrency int, pipelineMonitor metrics.PipelineMonitor) *Destination { - if maxConcurrentBackgroundSends <= 0 { - maxConcurrentBackgroundSends = 1 - } policy := backoff.NewExpBackoffPolicy( endpoint.BackoffFactor, endpoint.BackoffBase, @@ -140,6 +145,8 @@ func newDestination(endpoint config.Endpoint, metrics.DestinationExpVars.Set(destMeta.TelemetryName(), expVars) } + senderPool := NewSenderPool(minConcurrency, maxConcurrency, destMeta) + return &Destination{ host: endpoint.Host, url: buildURL(endpoint), @@ -147,7 +154,7 @@ func newDestination(endpoint config.Endpoint, contentType: contentType, client: httputils.NewResetClient(endpoint.ConnectionResetInterval, httpClientFactory(timeout, cfg)), destinationsContext: destinationsContext, - climit: make(chan struct{}, maxConcurrentBackgroundSends), + senderPool: senderPool, wg: sync.WaitGroup{}, backoff: policy, protocol: endpoint.Protocol, @@ -222,20 +229,16 @@ func (d *Destination) run(input chan *message.Payload, output chan *message.Payl func (d *Destination) sendConcurrent(payload *message.Payload, output chan *message.Payload, isRetrying chan bool) { d.wg.Add(1) - d.climit <- struct{}{} - go func() { - defer func() { - <-d.climit - d.wg.Done() - }() - d.sendAndRetry(payload, output, isRetrying) - }() + d.senderPool.Run(func() destinationResult { + result := d.sendAndRetry(payload, output, isRetrying) + d.wg.Done() + return result + }) } // Send sends a payload over HTTP, -func (d *Destination) sendAndRetry(payload *message.Payload, output chan *message.Payload, isRetrying chan bool) { +func (d *Destination) sendAndRetry(payload *message.Payload, output chan *message.Payload, isRetrying chan bool) destinationResult { for { - d.retryLock.Lock() nbErrors := d.nbErrors d.retryLock.Unlock() @@ -249,7 +252,13 @@ func (d *Destination) sendAndRetry(payload *message.Payload, output chan *messag metrics.TlmRetryCount.Add(1) } + start := time.Now() err := d.unconditionalSend(payload) + latency := time.Since(start) + result := destinationResult{ + latency: latency, + err: err, + } if err != nil { metrics.DestinationErrors.Add(1) @@ -265,7 +274,7 @@ func (d *Destination) sendAndRetry(payload *message.Payload, output chan *messag if err == context.Canceled { d.updateRetryState(nil, isRetrying) - return + return result } if d.shouldRetry { @@ -277,7 +286,7 @@ func (d *Destination) sendAndRetry(payload *message.Payload, output chan *messag metrics.LogsSent.Add(int64(len(payload.Messages))) metrics.TlmLogsSent.Add(float64(len(payload.Messages))) output <- payload - return + return result } } @@ -322,7 +331,6 @@ func (d *Destination) unconditionalSend(payload *message.Payload) (err error) { req = req.WithContext(ctx) resp, err := d.client.Do(req) - latency := time.Since(then).Milliseconds() metrics.TlmSenderLatency.Observe(float64(latency)) metrics.SenderLatency.Set(latency) @@ -460,7 +468,8 @@ func getMessageTimestamp(messages []*message.Message) int64 { func prepareCheckConnectivity(endpoint config.Endpoint, cfg pkgconfigmodel.Reader) (*client.DestinationsContext, *Destination) { ctx := client.NewDestinationsContext() // Lower the timeout to 5s because HTTP connectivity test is done synchronously during the agent bootstrap sequence - destination := newDestination(endpoint, JSONContentType, ctx, time.Second*5, 0, false, client.NewNoopDestinationMetadata(), cfg, metrics.NewNoopPipelineMonitor("")) + destination := newDestination(endpoint, JSONContentType, ctx, time.Second*5, false, client.NewNoopDestinationMetadata(), cfg, 1, 1, metrics.NewNoopPipelineMonitor("")) + return ctx, destination } diff --git a/pkg/logs/client/http/destination_test.go b/pkg/logs/client/http/destination_test.go index 979ac4ee8e1cf..b8c24656bcec9 100644 --- a/pkg/logs/client/http/destination_test.go +++ b/pkg/logs/client/http/destination_test.go @@ -95,7 +95,7 @@ func testNoRetry(t *testing.T, statusCode int) { func retryTest(t *testing.T, statusCode int) { cfg := configmock.New(t) respondChan := make(chan int) - server := NewTestServerWithOptions(statusCode, 0, true, respondChan, cfg) + server := NewTestServerWithOptions(statusCode, 1, true, respondChan, cfg) input := make(chan *message.Payload) output := make(chan *message.Payload) isRetrying := make(chan bool, 1) @@ -125,7 +125,7 @@ func retryTest(t *testing.T, statusCode int) { func TestDestinationContextCancel(t *testing.T) { cfg := configmock.New(t) respondChan := make(chan int) - server := NewTestServerWithOptions(429, 0, true, respondChan, cfg) + server := NewTestServerWithOptions(429, 1, true, respondChan, cfg) input := make(chan *message.Payload) output := make(chan *message.Payload) isRetrying := make(chan bool, 1) @@ -324,7 +324,7 @@ func TestDestinationConcurrentSendsShutdownIsHandled(t *testing.T) { func TestBackoffDelayEnabled(t *testing.T) { cfg := configmock.New(t) respondChan := make(chan int) - server := NewTestServerWithOptions(500, 0, true, respondChan, cfg) + server := NewTestServerWithOptions(500, 1, true, respondChan, cfg) input := make(chan *message.Payload) output := make(chan *message.Payload) isRetrying := make(chan bool, 1) @@ -341,7 +341,7 @@ func TestBackoffDelayEnabled(t *testing.T) { func TestBackoffDelayDisabled(t *testing.T) { cfg := configmock.New(t) respondChan := make(chan int) - server := NewTestServerWithOptions(500, 0, false, respondChan, cfg) + server := NewTestServerWithOptions(500, 1, false, respondChan, cfg) input := make(chan *message.Payload) output := make(chan *message.Payload) isRetrying := make(chan bool, 1) @@ -362,7 +362,7 @@ func TestDestinationHA(t *testing.T) { } isEndpointMRF := endpoint.IsMRF - dest := NewDestination(endpoint, JSONContentType, client.NewDestinationsContext(), 1, false, client.NewNoopDestinationMetadata(), configmock.New(t), metrics.NewNoopPipelineMonitor("")) + dest := NewDestination(endpoint, JSONContentType, client.NewDestinationsContext(), false, client.NewNoopDestinationMetadata(), configmock.New(t), 1, 1, metrics.NewNoopPipelineMonitor("")) isDestMRF := dest.IsMRF() assert.Equal(t, isEndpointMRF, isDestMRF) diff --git a/pkg/logs/client/http/sender_pool.go b/pkg/logs/client/http/sender_pool.go new file mode 100644 index 0000000000000..a8bea6bddc94e --- /dev/null +++ b/pkg/logs/client/http/sender_pool.go @@ -0,0 +1,157 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +// Package http provides an HTTP destination for logs. +package http + +import ( + "math" + "sync" + "time" + + "github.com/DataDog/datadog-agent/pkg/logs/client" + "github.com/DataDog/datadog-agent/pkg/logs/metrics" + "github.com/DataDog/datadog-agent/pkg/util/log" +) + +const ( + ewmaAlpha = 0.064 + concurrentSendersEwmaSampleInterval = 1 * time.Second + targetLatency = 150 * time.Millisecond +) + +// SenderPool is an interface for managing the concurrency of senders. +type SenderPool interface { + // Perform an operation with the sender concurrency implementation. + Run(func() destinationResult) +} + +type senderPool struct { + pool chan struct{} + virtualLatency time.Duration + shouldBackoff bool + inUseWorkers int + targetLatencyPerWorker time.Duration + minWorkers int + maxWorkers int + virtualLatencyLastSample time.Time + destMeta *client.DestinationMetadata + windowSum float64 + samples int + ewmaSampleInterval time.Duration + alpha float64 + sync.Mutex +} + +// NewSenderPool returns a new senderPool implementation that limits the number of concurrent senders. +func NewSenderPool(minWorkers int, maxWorkers int, destMeta *client.DestinationMetadata) SenderPool { + return newSenderPool(concurrentSendersEwmaSampleInterval, ewmaAlpha, minWorkers, maxWorkers, targetLatency, destMeta) +} + +func newSenderPool(ewmaSampleInterval time.Duration, alpha float64, minWorkers int, maxWorkers int, targetLatency time.Duration, destMeta *client.DestinationMetadata) *senderPool { + if minWorkers <= 0 { + minWorkers = 1 + } + if maxWorkers < minWorkers { + maxWorkers = minWorkers + } + targetLatencyPerWorker := targetLatency / time.Duration(minWorkers) + + sp := &senderPool{ + pool: make(chan struct{}, maxWorkers), + minWorkers: minWorkers, + maxWorkers: maxWorkers, + targetLatencyPerWorker: targetLatencyPerWorker, + virtualLatencyLastSample: time.Now(), + inUseWorkers: minWorkers, + destMeta: destMeta, + samples: 0, + ewmaSampleInterval: ewmaSampleInterval, + alpha: alpha, + } + // Start with minWorker senders + for range minWorkers { + sp.pool <- struct{}{} + } + return sp +} + +func (l *senderPool) Run(doWork func() destinationResult) { + l.resize() + <-l.pool + go func() { + result := doWork() + l.pool <- struct{}{} + + if l.maxWorkers == l.minWorkers { + return + } + l.Lock() + defer l.Unlock() + if result.err != nil { + // We only want to trigger a backoff for retryable errors. Issues such as + // server 500s should effectively freeze the pipeline pending a resolution. + _, ok := result.err.(*client.RetryableError) + l.shouldBackoff = l.shouldBackoff || ok + return + } + + l.windowSum += float64(result.latency) + l.samples++ + }() +} + +// Concurrency is scaled by attempting to converge on a target virtual latency. +// Virtual latency is the amount of time it takes to submit a payload to the worker pool. +// If Latency is above the target, more workers are added to the pool until the virtual latency is reached. +// This ensures the payload egress rate remains fair no matter how long the HTTP transaction takes +// (up to maxWorkers) +// This function is not concurrency safe. +func (l *senderPool) resize() { + if l.maxWorkers == l.minWorkers { + return + } + + if time.Since(l.virtualLatencyLastSample) >= l.ewmaSampleInterval { + l.Lock() + windowSum := l.windowSum + samples := l.samples + l.windowSum = 0 + l.samples = 0 + shouldBackoff := l.shouldBackoff + l.shouldBackoff = false + l.Unlock() + + if samples > 0 { + avgLatency := windowSum / float64(samples) + l.virtualLatency = time.Duration(float64(l.virtualLatency)*(1.0-ewmaAlpha) + (avgLatency * ewmaAlpha)) + l.virtualLatencyLastSample = time.Now() + } + + targetWorkers := int(math.Ceil(float64(l.virtualLatency) / float64(l.targetLatencyPerWorker))) + + if shouldBackoff { + log.Debugf("Backing off sender pool workers in response to transient connection issues with destination.") + // Backoff quickly on sender error + workersToDrop := l.inUseWorkers - l.minWorkers + for i := 0; i < workersToDrop; i++ { + <-l.pool + l.inUseWorkers-- + } + } else if targetWorkers > l.inUseWorkers && l.inUseWorkers < l.maxWorkers { + // If the virtual latency is above the target, add a worker to the pool. + l.pool <- struct{}{} + l.inUseWorkers++ + } else if targetWorkers < l.inUseWorkers && l.inUseWorkers > l.minWorkers { + // else if the virtual latency is below the target, remove a worker from the pool if there is more than minWorkers. + <-l.pool + l.inUseWorkers-- + } + + metrics.TlmNumWorkers.Set(float64(l.inUseWorkers), l.destMeta.TelemetryName()) + metrics.TlmVirtualLatency.Set(float64(l.virtualLatency/time.Millisecond), l.destMeta.TelemetryName()) + log.Debugf("Pool worker count at {%d} after resize", l.inUseWorkers) + } +} diff --git a/pkg/logs/client/http/sender_pool_test.go b/pkg/logs/client/http/sender_pool_test.go new file mode 100644 index 0000000000000..4310606c19ef1 --- /dev/null +++ b/pkg/logs/client/http/sender_pool_test.go @@ -0,0 +1,83 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +package http + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "github.com/DataDog/datadog-agent/pkg/logs/client" +) + +var defaultMinWorkers = 4 +var defaultMaxWorkers = defaultMinWorkers * 10 + +func defaultPool() *senderPool { + return newSenderPool(0, ewmaAlpha, defaultMinWorkers, defaultMaxWorkers, targetLatency, client.NewNoopDestinationMetadata()) +} + +/*func newPool(min int, max int) *senderPool { + return newSenderPool(0, ewmaAlpha, min, max, targetLatency, client.NewNoopDestinationMetadata()) +} +func TestDestinationError(t *testing.T) { + // Get 429, see worker rollback + // Get 500, perform no rollback +}*/ + +func TestWorkerCounts(t *testing.T) { + scenarios := []struct { + name string + latency time.Duration + expectedWorkerCount int + }{ + { + name: "Mininum Workers chosen if latency below target", + latency: 0, + expectedWorkerCount: defaultMinWorkers, + }, + { + name: "Reasonable number of workers added at higher than target latency", + latency: targetLatency * 2, + expectedWorkerCount: defaultMinWorkers * 2, + }, + { + name: "Maximum number of workers not exceeded", + latency: targetLatency * 20, + expectedWorkerCount: defaultMaxWorkers, + }, + } + + for _, s := range scenarios { + t.Run(s.name, func(t *testing.T) { + pool := defaultPool() + + for i := 0; i < 100; i++ { + pool.Run(func() destinationResult { + return destinationResult{latency: s.latency} + }) + } + + assert.Equal(t, s.expectedWorkerCount, pool.inUseWorkers) + }) + } +} + +/*func TestVirtualLatencyCalculations(t *testing.T) { + // Condense to 1 number + // Condense to 2 number + pool := defaultPool() + + for i := 0; i < 100; i++ { + pool.Run(func() destinationResult { + return destinationResult{latency: targetLatency} + }) + } + + // Should converge on a virtual latency of 10ms since there is only 1 worker, so virtual latency == real latency. + require.InDelta(t, 10*time.Millisecond, pool.virtualLatency, float64(3*time.Millisecond)) +}*/ diff --git a/pkg/logs/client/http/sync_destination.go b/pkg/logs/client/http/sync_destination.go index ed134f6896e8c..49774d1710138 100644 --- a/pkg/logs/client/http/sync_destination.go +++ b/pkg/logs/client/http/sync_destination.go @@ -26,15 +26,19 @@ type SyncDestination struct { } // NewSyncDestination returns a new synchronous Destination. -func NewSyncDestination(endpoint config.Endpoint, +func NewSyncDestination( + endpoint config.Endpoint, contentType string, destinationsContext *client.DestinationsContext, senderDoneChan chan *sync.WaitGroup, destMeta *client.DestinationMetadata, - cfg pkgconfigmodel.Reader) *SyncDestination { + cfg pkgconfigmodel.Reader, +) *SyncDestination { + minConcurrency := 1 + maxConcurrency := minConcurrency return &SyncDestination{ - destination: newDestination(endpoint, contentType, destinationsContext, time.Second*10, 1, false, destMeta, cfg, metrics.NewNoopPipelineMonitor("0")), + destination: newDestination(endpoint, contentType, destinationsContext, time.Second*10, false, destMeta, cfg, minConcurrency, maxConcurrency, metrics.NewNoopPipelineMonitor("0")), senderDoneChan: senderDoneChan, } } diff --git a/pkg/logs/client/http/test_utils.go b/pkg/logs/client/http/test_utils.go index 06e9da35cf5ab..9ffeed0340261 100644 --- a/pkg/logs/client/http/test_utils.go +++ b/pkg/logs/client/http/test_utils.go @@ -39,11 +39,11 @@ type TestServer struct { // NewTestServer creates a new test server func NewTestServer(statusCode int, cfg pkgconfigmodel.Reader) *TestServer { - return NewTestServerWithOptions(statusCode, 0, true, nil, cfg) + return NewTestServerWithOptions(statusCode, 1, true, nil, cfg) } // NewTestServerWithOptions creates a new test server with concurrency and response control -func NewTestServerWithOptions(statusCode int, senders int, retryDestination bool, respondChan chan int, cfg pkgconfigmodel.Reader) *TestServer { +func NewTestServerWithOptions(statusCode int, concurrentSends int, retryDestination bool, respondChan chan int, cfg pkgconfigmodel.Reader) *TestServer { statusCodeContainer := &StatusCodeContainer{statusCode: statusCode} var request http.Request var mu = sync.Mutex{} @@ -82,7 +82,7 @@ func NewTestServerWithOptions(statusCode int, senders int, retryDestination bool endpoint.BackoffMax = 10 endpoint.RecoveryInterval = 1 - dest := NewDestination(endpoint, JSONContentType, destCtx, senders, retryDestination, client.NewNoopDestinationMetadata(), cfg, metrics.NewNoopPipelineMonitor("")) + dest := NewDestination(endpoint, JSONContentType, destCtx, retryDestination, client.NewNoopDestinationMetadata(), cfg, concurrentSends, concurrentSends, metrics.NewNoopPipelineMonitor("")) return &TestServer{ httpServer: ts, DestCtx: destCtx, diff --git a/pkg/logs/metrics/metrics.go b/pkg/logs/metrics/metrics.go index 0f8aaf35e2838..11750f5d9eca0 100644 --- a/pkg/logs/metrics/metrics.go +++ b/pkg/logs/metrics/metrics.go @@ -76,8 +76,8 @@ var ( //nolint:revive // TODO(AML) Fix revive linter TlmDestinationHttpRespByStatusAndUrl = telemetry.NewCounter("logs", "destination_http_resp", []string{"status_code", "url"}, "Count of http responses by status code and destination url") - // TlmAutoMultilineAggregatorFlush Count of each line flushed from the auto mulitline aggregator. - TlmAutoMultilineAggregatorFlush = telemetry.NewCounter("logs", "auto_multi_line_aggregator_flush", []string{"truncated", "line_type"}, "Count of each line flushed from the auto mulitline aggregator") + // TlmAutoMultilineAggregatorFlush Count of each line flushed from the auto multiline aggregator. + TlmAutoMultilineAggregatorFlush = telemetry.NewCounter("logs", "auto_multi_line_aggregator_flush", []string{"truncated", "line_type"}, "Count of each line flushed from the auto multiline aggregator") // TlmLogsDiscardedFromSDSBuffer how many messages were dropped when waiting for an SDS configuration because the buffer is full TlmLogsDiscardedFromSDSBuffer = telemetry.NewCounter("logs", "sds__dropped_from_buffer", nil, "Count of messages dropped from the buffer while waiting for an SDS configuration") @@ -88,9 +88,13 @@ var ( TlmUtilizationRatio = telemetry.NewGauge("logs_component_utilization", "ratio", []string{"name", "instance"}, "Gauge of the utilization ratio of a component") // TlmUtilizationItems is the capacity of a component by number of elements // Both the number of items and the number of bytes are aggregated and exposed as a ewma. - TlmUtilizationItems = telemetry.NewGauge("logs_component_utilization", "items", []string{"name", "instance"}, "Gauge of the number of items currently held in a component and it's bufferes") + TlmUtilizationItems = telemetry.NewGauge("logs_component_utilization", "items", []string{"name", "instance"}, "Gauge of the number of items currently held in a component and its buffers") // TlmUtilizationBytes is the capacity of a component by number of bytes - TlmUtilizationBytes = telemetry.NewGauge("logs_component_utilization", "bytes", []string{"name", "instance"}, "Gauge of the number of bytes currently held in a component and it's bufferes") + TlmUtilizationBytes = telemetry.NewGauge("logs_component_utilization", "bytes", []string{"name", "instance"}, "Gauge of the number of bytes currently held in a component and its buffers") + // TlmNumWorkers is the number of senders in use + TlmNumWorkers = telemetry.NewGauge("logs_destination", "pool_workers", []string{"instance"}, "") + // TlmVirtualLatency is the amount of time a payload spends waiting for a worker. + TlmVirtualLatency = telemetry.NewGauge("logs_destination", "virtual_latency", []string{"instance"}, "") ) func init() { diff --git a/pkg/logs/pipeline/pipeline.go b/pkg/logs/pipeline/pipeline.go index f2e701598f041..da644d74a0d53 100644 --- a/pkg/logs/pipeline/pipeline.go +++ b/pkg/logs/pipeline/pipeline.go @@ -8,7 +8,6 @@ package pipeline import ( "context" - "strconv" "sync" "github.com/DataDog/datadog-agent/comp/core/hostname/hostnameinterface" @@ -16,9 +15,8 @@ import ( logscompression "github.com/DataDog/datadog-agent/comp/serializer/logscompression/def" pkgconfigmodel "github.com/DataDog/datadog-agent/pkg/config/model" pkgconfigsetup "github.com/DataDog/datadog-agent/pkg/config/setup" + "github.com/DataDog/datadog-agent/pkg/logs/auditor" "github.com/DataDog/datadog-agent/pkg/logs/client" - "github.com/DataDog/datadog-agent/pkg/logs/client/http" - "github.com/DataDog/datadog-agent/pkg/logs/client/tcp" "github.com/DataDog/datadog-agent/pkg/logs/diagnostic" "github.com/DataDog/datadog-agent/pkg/logs/message" "github.com/DataDog/datadog-agent/pkg/logs/metrics" @@ -34,42 +32,32 @@ type Pipeline struct { flushChan chan struct{} processor *processor.Processor strategy sender.Strategy - sender *sender.Sender + sender sender.PipelineComponent serverless bool flushWg *sync.WaitGroup pipelineMonitor metrics.PipelineMonitor } // NewPipeline returns a new Pipeline -func NewPipeline(outputChan chan *message.Payload, +func NewPipeline( + outputChan chan *message.Payload, processingRules []*config.ProcessingRule, endpoints *config.Endpoints, destinationsContext *client.DestinationsContext, + auditor auditor.Auditor, + senderImpl sender.PipelineComponent, diagnosticMessageReceiver diagnostic.MessageReceiver, serverless bool, + flushWg *sync.WaitGroup, pipelineID int, status statusinterface.Status, hostname hostnameinterface.Component, cfg pkgconfigmodel.Reader, compression logscompression.Component, ) *Pipeline { - - var senderDoneChan chan *sync.WaitGroup - var flushWg *sync.WaitGroup - if serverless { - senderDoneChan = make(chan *sync.WaitGroup) - flushWg = &sync.WaitGroup{} - } - pipelineMonitor := metrics.NewTelemetryPipelineMonitor(strconv.Itoa(pipelineID)) - - mainDestinations := getDestinations(endpoints, destinationsContext, pipelineMonitor, serverless, senderDoneChan, status, cfg) - strategyInput := make(chan *message.Message, pkgconfigsetup.Datadog().GetInt("logs_config.message_channel_size")) - senderInput := make(chan *message.Payload, 1) // Only buffer 1 message since payloads can be large flushChan := make(chan struct{}) - var logsSender *sender.Sender - var encoder processor.Encoder if serverless { encoder = processor.JSONServerlessEncoder @@ -80,30 +68,26 @@ func NewPipeline(outputChan chan *message.Payload, } else { encoder = processor.RawEncoder } - - strategy := getStrategy(strategyInput, senderInput, flushChan, endpoints, serverless, flushWg, pipelineMonitor, compression) - logsSender = sender.NewSender(cfg, senderInput, outputChan, mainDestinations, pkgconfigsetup.Datadog().GetInt("logs_config.payload_channel_size"), senderDoneChan, flushWg, pipelineMonitor) + strategy := getStrategy(strategyInput, senderImpl.In(), flushChan, endpoints, serverless, flushWg, senderImpl.PipelineMonitor(), compression) inputChan := make(chan *message.Message, pkgconfigsetup.Datadog().GetInt("logs_config.message_channel_size")) - processor := processor.New(cfg, inputChan, strategyInput, processingRules, - encoder, diagnosticMessageReceiver, hostname, pipelineMonitor) + encoder, diagnosticMessageReceiver, hostname, senderImpl.PipelineMonitor()) return &Pipeline{ InputChan: inputChan, flushChan: flushChan, processor: processor, strategy: strategy, - sender: logsSender, + sender: senderImpl, serverless: serverless, flushWg: flushWg, - pipelineMonitor: pipelineMonitor, + pipelineMonitor: senderImpl.PipelineMonitor(), } } // Start launches the pipeline func (p *Pipeline) Start() { - p.sender.Start() p.strategy.Start() p.processor.Start() } @@ -112,7 +96,6 @@ func (p *Pipeline) Start() { func (p *Pipeline) Stop() { p.processor.Stop() p.strategy.Stop() - p.sender.Stop() } // Flush flushes synchronously the processor and sender managed by this pipeline. @@ -126,39 +109,6 @@ func (p *Pipeline) Flush(ctx context.Context) { } } -func getDestinations(endpoints *config.Endpoints, destinationsContext *client.DestinationsContext, pipelineMonitor metrics.PipelineMonitor, serverless bool, senderDoneChan chan *sync.WaitGroup, status statusinterface.Status, cfg pkgconfigmodel.Reader) *client.Destinations { - reliable := []client.Destination{} - additionals := []client.Destination{} - - if endpoints.UseHTTP { - for i, endpoint := range endpoints.GetReliableEndpoints() { - destMeta := client.NewDestinationMetadata("logs", pipelineMonitor.ID(), "reliable", strconv.Itoa(i)) - if serverless { - reliable = append(reliable, http.NewSyncDestination(endpoint, http.JSONContentType, destinationsContext, senderDoneChan, destMeta, cfg)) - } else { - reliable = append(reliable, http.NewDestination(endpoint, http.JSONContentType, destinationsContext, endpoints.BatchMaxConcurrentSend, true, destMeta, cfg, pipelineMonitor)) - } - } - for i, endpoint := range endpoints.GetUnReliableEndpoints() { - destMeta := client.NewDestinationMetadata("logs", pipelineMonitor.ID(), "unreliable", strconv.Itoa(i)) - if serverless { - additionals = append(additionals, http.NewSyncDestination(endpoint, http.JSONContentType, destinationsContext, senderDoneChan, destMeta, cfg)) - } else { - additionals = append(additionals, http.NewDestination(endpoint, http.JSONContentType, destinationsContext, endpoints.BatchMaxConcurrentSend, false, destMeta, cfg, pipelineMonitor)) - } - } - return client.NewDestinations(reliable, additionals) - } - for _, endpoint := range endpoints.GetReliableEndpoints() { - reliable = append(reliable, tcp.NewDestination(endpoint, endpoints.UseProto, destinationsContext, !serverless, status)) - } - for _, endpoint := range endpoints.GetUnReliableEndpoints() { - additionals = append(additionals, tcp.NewDestination(endpoint, endpoints.UseProto, destinationsContext, false, status)) - } - - return client.NewDestinations(reliable, additionals) -} - //nolint:revive // TODO(AML) Fix revive linter func getStrategy( inputChan chan *message.Message, diff --git a/pkg/logs/pipeline/provider.go b/pkg/logs/pipeline/provider.go index c7979c06623ad..b1f31d1f1794a 100644 --- a/pkg/logs/pipeline/provider.go +++ b/pkg/logs/pipeline/provider.go @@ -7,6 +7,7 @@ package pipeline import ( "context" + "sync" "github.com/hashicorp/go-multierror" "go.uber.org/atomic" @@ -15,17 +16,28 @@ import ( "github.com/DataDog/datadog-agent/comp/logs/agent/config" logscompression "github.com/DataDog/datadog-agent/comp/serializer/logscompression/def" pkgconfigmodel "github.com/DataDog/datadog-agent/pkg/config/model" + pkgconfigsetup "github.com/DataDog/datadog-agent/pkg/config/setup" "github.com/DataDog/datadog-agent/pkg/logs/auditor" "github.com/DataDog/datadog-agent/pkg/logs/client" + "github.com/DataDog/datadog-agent/pkg/logs/client/http" "github.com/DataDog/datadog-agent/pkg/logs/diagnostic" "github.com/DataDog/datadog-agent/pkg/logs/message" "github.com/DataDog/datadog-agent/pkg/logs/metrics" "github.com/DataDog/datadog-agent/pkg/logs/sds" + "github.com/DataDog/datadog-agent/pkg/logs/sender" "github.com/DataDog/datadog-agent/pkg/logs/status/statusinterface" "github.com/DataDog/datadog-agent/pkg/util/log" "github.com/DataDog/datadog-agent/pkg/util/startstop" ) +const ( + // maxConcurrencyPerPipeline is used to determine the maxSenderConcurrency value for the default provider creation logic. + // We don't want to require users to know enough about our underlying architecture to understand what this value is meant + // to do, so it's currently housed in a constant rather than a config entry. Users who wish to influence min/max + // SenderConcurrency via config options should utilize the endpoint's BatchMaxConcurrentSends override instead. + maxConcurrencyPerPipeline = 10 +) + // Provider provides message channels type Provider interface { Start() @@ -48,12 +60,14 @@ type provider struct { outputChan chan *message.Payload processingRules []*config.ProcessingRule endpoints *config.Endpoints + sender sender.PipelineComponent pipelines []*Pipeline currentPipelineIndex *atomic.Uint32 destinationsContext *client.DestinationsContext serverless bool + flushWg *sync.WaitGroup status statusinterface.Status hostname hostnameinterface.Component @@ -62,7 +76,8 @@ type provider struct { } // NewProvider returns a new Provider -func NewProvider(numberOfPipelines int, +func NewProvider( + numberOfPipelines int, auditor auditor.Auditor, diagnosticMessageReceiver diagnostic.MessageReceiver, processingRules []*config.ProcessingRule, @@ -73,11 +88,42 @@ func NewProvider(numberOfPipelines int, cfg pkgconfigmodel.Reader, compression logscompression.Component, ) Provider { - return newProvider(numberOfPipelines, auditor, diagnosticMessageReceiver, processingRules, endpoints, destinationsContext, false, status, hostname, cfg, compression) + var senderDoneChan chan *sync.WaitGroup + var flushWg *sync.WaitGroup + serverless := false + workerCount := sender.DefaultWorkerCount + minSenderConcurrency := numberOfPipelines + maxSenderConcurrency := numberOfPipelines * maxConcurrencyPerPipeline + if endpoints.BatchMaxConcurrentSend != pkgconfigsetup.DefaultBatchMaxConcurrentSend { + minSenderConcurrency = numberOfPipelines * endpoints.BatchMaxConcurrentSend + maxSenderConcurrency = minSenderConcurrency + } + + return newProvider( + numberOfPipelines, + auditor, + diagnosticMessageReceiver, + processingRules, + endpoints, + destinationsContext, + senderDoneChan, + flushWg, + serverless, + workerCount, + status, + hostname, + cfg, + compression, + minSenderConcurrency, + maxSenderConcurrency, + ) } -// NewServerlessProvider returns a new Provider in serverless mode -func NewServerlessProvider(numberOfPipelines int, +// NewLegacyProvider Creates a pipeline setup that mirrors the legacy implementation of the senders. +// There will be one worker created for each pipeline, and each worker will only be able to run one +// operation at a time (unless overridden by DefaultBatchMaxConcurrentSend) +func NewLegacyProvider( + numberOfPipelines int, auditor auditor.Auditor, diagnosticMessageReceiver diagnostic.MessageReceiver, processingRules []*config.ProcessingRule, @@ -87,9 +133,40 @@ func NewServerlessProvider(numberOfPipelines int, hostname hostnameinterface.Component, cfg pkgconfigmodel.Reader, compression logscompression.Component, + serverless bool, ) Provider { + var senderDoneChan chan *sync.WaitGroup + var flushWg *sync.WaitGroup + minSenderConcurrency := 1 + + if serverless { + senderDoneChan = make(chan *sync.WaitGroup) + flushWg = &sync.WaitGroup{} + } else if endpoints.BatchMaxConcurrentSend != pkgconfigsetup.DefaultBatchMaxConcurrentSend { + minSenderConcurrency = endpoints.BatchMaxConcurrentSend + } - return newProvider(numberOfPipelines, auditor, diagnosticMessageReceiver, processingRules, endpoints, destinationsContext, true, status, hostname, cfg, compression) + maxSenderConcurrency := minSenderConcurrency + workerCount := numberOfPipelines + + return newProvider( + numberOfPipelines, + auditor, + diagnosticMessageReceiver, + processingRules, + endpoints, + destinationsContext, + senderDoneChan, + flushWg, + serverless, + workerCount, + status, + hostname, + cfg, + compression, + minSenderConcurrency, + maxSenderConcurrency, + ) } // NewMockProvider creates a new provider that will not provide any pipelines. @@ -97,28 +174,56 @@ func NewMockProvider() Provider { return &provider{} } -func newProvider(numberOfPipelines int, +func newProvider( + numberOfPipelines int, auditor auditor.Auditor, diagnosticMessageReceiver diagnostic.MessageReceiver, processingRules []*config.ProcessingRule, endpoints *config.Endpoints, destinationsContext *client.DestinationsContext, + senderDoneChan chan *sync.WaitGroup, + flushWg *sync.WaitGroup, serverless bool, + workerCount int, status statusinterface.Status, hostname hostnameinterface.Component, cfg pkgconfigmodel.Reader, compression logscompression.Component, + minSenderConcurrency int, + maxSenderConcurrency int, ) Provider { + componentName := "logs" + contentType := http.JSONContentType + + senderImpl := sender.NewSender( + cfg, + auditor, + cfg.GetInt("logs_config.payload_channel_size"), + senderDoneChan, + flushWg, + endpoints, + destinationsContext, + status, + serverless, + componentName, + contentType, + workerCount, + minSenderConcurrency, + maxSenderConcurrency, + ) + return &provider{ numberOfPipelines: numberOfPipelines, auditor: auditor, diagnosticMessageReceiver: diagnosticMessageReceiver, processingRules: processingRules, endpoints: endpoints, + sender: senderImpl, pipelines: []*Pipeline{}, currentPipelineIndex: atomic.NewUint32(0), destinationsContext: destinationsContext, serverless: serverless, + flushWg: flushWg, status: status, hostname: hostname, cfg: cfg, @@ -130,9 +235,25 @@ func newProvider(numberOfPipelines int, func (p *provider) Start() { // This requires the auditor to be started before. p.outputChan = p.auditor.Channel() + p.sender.Start() for i := 0; i < p.numberOfPipelines; i++ { - pipeline := NewPipeline(p.outputChan, p.processingRules, p.endpoints, p.destinationsContext, p.diagnosticMessageReceiver, p.serverless, i, p.status, p.hostname, p.cfg, p.compression) + pipeline := NewPipeline( + p.outputChan, + p.processingRules, + p.endpoints, + p.destinationsContext, + p.auditor, + p.sender, + p.diagnosticMessageReceiver, + p.serverless, + p.flushWg, + i, + p.status, + p.hostname, + p.cfg, + p.compression, + ) pipeline.Start() p.pipelines = append(p.pipelines, pipeline) } @@ -142,15 +263,19 @@ func (p *provider) Start() { // this call blocks until all pipelines are stopped func (p *provider) Stop() { stopper := startstop.NewParallelStopper() + + // close the pipelines for _, pipeline := range p.pipelines { stopper.Add(pipeline) } + stopper.Stop() + p.sender.Stop() p.pipelines = p.pipelines[:0] p.outputChan = nil } -// return true if all processor SDS scanners are active. +// return true if all SDS scanners are active. func (p *provider) reconfigureSDS(config []byte, orderType sds.ReconfigureOrderType) (bool, error) { var responses []chan sds.ReconfigureResponse diff --git a/pkg/logs/pipeline/provider_test.go b/pkg/logs/pipeline/provider_test.go index c9aadc09139f5..e55c067f04cda 100644 --- a/pkg/logs/pipeline/provider_test.go +++ b/pkg/logs/pipeline/provider_test.go @@ -15,6 +15,7 @@ import ( "github.com/DataDog/datadog-agent/comp/logs/agent/config" compressionfx "github.com/DataDog/datadog-agent/comp/serializer/logscompression/fx-mock" "github.com/DataDog/datadog-agent/pkg/logs/auditor" + "github.com/DataDog/datadog-agent/pkg/logs/sender" "github.com/DataDog/datadog-agent/pkg/status/health" ) @@ -29,6 +30,7 @@ func (suite *ProviderTestSuite) SetupTest() { suite.p = &provider{ numberOfPipelines: 3, auditor: suite.a, + sender: sender.NewMockSender(), pipelines: []*Pipeline{}, endpoints: config.NewEndpoints(config.Endpoint{}, nil, true, false), currentPipelineIndex: atomic.NewUint32(0), diff --git a/pkg/logs/sender/batch_strategy.go b/pkg/logs/sender/batch_strategy.go index d3cee851b303d..eb90ed0faac74 100644 --- a/pkg/logs/sender/batch_strategy.go +++ b/pkg/logs/sender/batch_strategy.go @@ -196,6 +196,9 @@ func (s *batchStrategy) sendMessages(messages []*message.Message, outputChan cha s.flushWg.Add(1) } + for _, m := range messages { + m.SetContent([]byte{}) + } p := &message.Payload{ Messages: messages, Encoded: encodedPayload.Bytes(), diff --git a/pkg/logs/sender/go.mod b/pkg/logs/sender/go.mod index 243a8f267059b..c49a56f58a0f8 100644 --- a/pkg/logs/sender/go.mod +++ b/pkg/logs/sender/go.mod @@ -7,6 +7,7 @@ require ( github.com/DataDog/datadog-agent/comp/serializer/logscompression v0.64.0-devel github.com/DataDog/datadog-agent/pkg/config/mock v0.61.0 github.com/DataDog/datadog-agent/pkg/config/model v0.64.0-devel + github.com/DataDog/datadog-agent/pkg/logs/auditor v0.0.0-00010101000000-000000000000 github.com/DataDog/datadog-agent/pkg/logs/client v0.61.0 github.com/DataDog/datadog-agent/pkg/logs/message v0.61.0 github.com/DataDog/datadog-agent/pkg/logs/metrics v0.61.0 @@ -36,6 +37,7 @@ require ( github.com/DataDog/datadog-agent/pkg/config/viperconfig v0.0.0-20250218170314-8625d1ac5ae7 // indirect github.com/DataDog/datadog-agent/pkg/fips v0.0.0 // indirect github.com/DataDog/datadog-agent/pkg/logs/status/utils v0.61.0 // indirect + github.com/DataDog/datadog-agent/pkg/status/health v0.61.0 // indirect github.com/DataDog/datadog-agent/pkg/util/backoff v0.61.0 // indirect github.com/DataDog/datadog-agent/pkg/util/executable v0.61.0 // indirect github.com/DataDog/datadog-agent/pkg/util/filesystem v0.61.0 // indirect diff --git a/pkg/logs/sender/sender.go b/pkg/logs/sender/sender.go index 51ec837383db4..468e21fbd5f01 100644 --- a/pkg/logs/sender/sender.go +++ b/pkg/logs/sender/sender.go @@ -6,171 +6,145 @@ package sender import ( - "strconv" "sync" - "time" + "github.com/DataDog/datadog-agent/comp/logs/agent/config" pkgconfigmodel "github.com/DataDog/datadog-agent/pkg/config/model" + "github.com/DataDog/datadog-agent/pkg/logs/auditor" "github.com/DataDog/datadog-agent/pkg/logs/client" "github.com/DataDog/datadog-agent/pkg/logs/message" "github.com/DataDog/datadog-agent/pkg/logs/metrics" - "github.com/DataDog/datadog-agent/pkg/telemetry" + "github.com/DataDog/datadog-agent/pkg/logs/status/statusinterface" + "github.com/DataDog/datadog-agent/pkg/util/log" ) -var ( - tlmPayloadsDropped = telemetry.NewCounterWithOpts("logs_sender", "payloads_dropped", []string{"reliable", "destination"}, "Payloads dropped", telemetry.Options{DefaultMetric: true}) - tlmMessagesDropped = telemetry.NewCounterWithOpts("logs_sender", "messages_dropped", []string{"reliable", "destination"}, "Messages dropped", telemetry.Options{DefaultMetric: true}) - tlmSendWaitTime = telemetry.NewCounter("logs_sender", "send_wait", []string{}, "Time spent waiting for all sends to finish") +const ( + // DefaultWorkerCount - By default most pipelines will only require a single sender worker, as the single worker itself can + // concurrently transmit multiple http requests simultaneously. This value is not intended to be configurable, but legacy + // usages of the sender will set this to their own defaults where necessary. See the BatchMaxConcurrentSends setting for + // adjusting the sender concurrency values via config. + DefaultWorkerCount = 1 ) -// Sender sends logs to different destinations. Destinations can be either -// reliable or unreliable. The sender ensures that logs are sent to at least -// one reliable destination and will block the pipeline if they are in an -// error state. Unreliable destinations will only send logs when at least -// one reliable destination is also sending logs. However they do not update -// the auditor or block the pipeline if they fail. There will always be at -// least 1 reliable destination (the main destination). +// PipelineComponent abstracts a pipeline component +type PipelineComponent interface { + In() chan *message.Payload + PipelineMonitor() metrics.PipelineMonitor + Start() + Stop() +} + +// Sender distribute payloads on multiple +// underlying senders. +// Do not re-use a Sender, reinstantiate one instead. type Sender struct { - config pkgconfigmodel.Reader - inputChan chan *message.Payload - outputChan chan *message.Payload - destinations *client.Destinations - done chan struct{} - bufferSize int - senderDoneChan chan *sync.WaitGroup - flushWg *sync.WaitGroup + workers []*worker + queues []chan *message.Payload pipelineMonitor metrics.PipelineMonitor - utilization metrics.UtilizationMonitor + flushWg *sync.WaitGroup + + idx int } // NewSender returns a new sender. -func NewSender(config pkgconfigmodel.Reader, inputChan chan *message.Payload, outputChan chan *message.Payload, destinations *client.Destinations, bufferSize int, senderDoneChan chan *sync.WaitGroup, flushWg *sync.WaitGroup, pipelineMonitor metrics.PipelineMonitor) *Sender { +func NewSender( + config pkgconfigmodel.Reader, + auditor auditor.Auditor, + bufferSize int, + senderDoneChan chan *sync.WaitGroup, + flushWg *sync.WaitGroup, + endpoints *config.Endpoints, + destinationsCtx *client.DestinationsContext, + status statusinterface.Status, + serverless bool, + componentName string, + contentType string, + workerCount int, + minWorkerConcurrency int, + maxWorkerConcurrency int, +) *Sender { + log.Debugf( + "Creating a new pipeline with %d sender workers, %d min sender concurrency, and %d max sender concurrency", + workerCount, + minWorkerConcurrency, + maxWorkerConcurrency, + ) + pipelineMonitor := metrics.NewTelemetryPipelineMonitor("sender_mux") + + var workers []*worker + + // It simplifies our workflows to keep the queues count value at one. Retaining the minimalistic logic required to support values larger than one + // to allow us to easily explore alternate configurations moving forward. + queuesCount := 1 + + workersPerQueue := workerCount + if workersPerQueue <= 0 { + workersPerQueue = DefaultWorkerCount + } + + queues := make([]chan *message.Payload, queuesCount) + + for i := range queuesCount { + // create a queue + queues[i] = make(chan *message.Payload, workersPerQueue+1) + // output of this queue, create workers + for range workersPerQueue { + worker := newSenderWorker( + config, + auditor, + bufferSize, + senderDoneChan, + flushWg, + endpoints, + destinationsCtx, + status, + serverless, + componentName, + contentType, + minWorkerConcurrency, + maxWorkerConcurrency, + queues[i], + pipelineMonitor, + ) + workers = append(workers, worker) + } + } + return &Sender{ - config: config, - inputChan: inputChan, - outputChan: outputChan, - destinations: destinations, - done: make(chan struct{}), - bufferSize: bufferSize, - senderDoneChan: senderDoneChan, - flushWg: flushWg, - - // Telemetry + workers: workers, pipelineMonitor: pipelineMonitor, - utilization: pipelineMonitor.MakeUtilizationMonitor("sender"), + queues: queues, + flushWg: flushWg, } } -// Start starts the sender. -func (s *Sender) Start() { - go s.run() +// In is the input channel of one or more sender workers +func (s *Sender) In() chan *message.Payload { + s.idx = (s.idx + 1) % len(s.queues) + log.Infof("redistributed to input %d", s.idx) + return s.queues[s.idx] } -// Stop stops the sender, -// this call blocks until inputChan is flushed -func (s *Sender) Stop() { - close(s.inputChan) - <-s.done +// PipelineMonitor returns the pipeline monitor of the sender workers. +func (s *Sender) PipelineMonitor() metrics.PipelineMonitor { + return s.pipelineMonitor } -func (s *Sender) run() { - reliableDestinations := buildDestinationSenders(s.config, s.destinations.Reliable, s.outputChan, s.bufferSize) - - sink := additionalDestinationsSink(s.bufferSize) - unreliableDestinations := buildDestinationSenders(s.config, s.destinations.Unreliable, sink, s.bufferSize) - - for payload := range s.inputChan { - s.utilization.Start() - var startInUse = time.Now() - senderDoneWg := &sync.WaitGroup{} - - sent := false - for !sent { - for _, destSender := range reliableDestinations { - if destSender.Send(payload) { - if destSender.destination.Metadata().ReportingEnabled { - s.pipelineMonitor.ReportComponentIngress(payload, destSender.destination.Metadata().MonitorTag()) - } - sent = true - if s.senderDoneChan != nil { - senderDoneWg.Add(1) - s.senderDoneChan <- senderDoneWg - } - } - } - - if !sent { - // Throttle the poll loop while waiting for a send to succeed - // This will only happen when all reliable destinations - // are blocked so logs have no where to go. - time.Sleep(100 * time.Millisecond) - } - } - - for i, destSender := range reliableDestinations { - // If an endpoint is stuck in the previous step, try to buffer the payloads if we have room to mitigate - // loss on intermittent failures. - if !destSender.lastSendSucceeded { - if !destSender.NonBlockingSend(payload) { - tlmPayloadsDropped.Inc("true", strconv.Itoa(i)) - tlmMessagesDropped.Add(float64(len(payload.Messages)), "true", strconv.Itoa(i)) - } - } - } - - // Attempt to send to unreliable destinations - for i, destSender := range unreliableDestinations { - if !destSender.NonBlockingSend(payload) { - tlmPayloadsDropped.Inc("false", strconv.Itoa(i)) - tlmMessagesDropped.Add(float64(len(payload.Messages)), "false", strconv.Itoa(i)) - if s.senderDoneChan != nil { - senderDoneWg.Add(1) - s.senderDoneChan <- senderDoneWg - } - } - } - - inUse := float64(time.Since(startInUse) / time.Millisecond) - tlmSendWaitTime.Add(inUse) - s.utilization.Stop() - - if s.senderDoneChan != nil && s.flushWg != nil { - // Wait for all destinations to finish sending the payload - senderDoneWg.Wait() - // Decrement the wait group when this payload has been sent - s.flushWg.Done() - } - s.pipelineMonitor.ReportComponentEgress(payload, "sender") - } - - // Cleanup the destinations - for _, destSender := range reliableDestinations { - destSender.Stop() - } - for _, destSender := range unreliableDestinations { - destSender.Stop() +// Start starts all sender workers. +func (s *Sender) Start() { + for _, worker := range s.workers { + worker.start() } - close(sink) - s.done <- struct{}{} -} - -// Drains the output channel from destinations that don't update the auditor. -func additionalDestinationsSink(bufferSize int) chan *message.Payload { - sink := make(chan *message.Payload, bufferSize) - go func() { - // drain channel, stop when channel is closed - //nolint:revive // TODO(AML) Fix revive linter - for range sink { - } - }() - return sink } -func buildDestinationSenders(config pkgconfigmodel.Reader, destinations []client.Destination, output chan *message.Payload, bufferSize int) []*DestinationSender { - destinationSenders := []*DestinationSender{} - for _, destination := range destinations { - destinationSenders = append(destinationSenders, NewDestinationSender(config, destination, output, bufferSize)) +// Stop stops all sender workers +func (s *Sender) Stop() { + log.Info("sender mux stopping") + for _, s := range s.workers { + s.stop() + } + for i := range s.queues { + close(s.queues[i]) } - return destinationSenders } diff --git a/pkg/logs/sender/sender_mock.go b/pkg/logs/sender/sender_mock.go new file mode 100644 index 0000000000000..f21f447741b80 --- /dev/null +++ b/pkg/logs/sender/sender_mock.go @@ -0,0 +1,49 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2025-present Datadog, Inc. + +package sender + +import ( + "github.com/DataDog/datadog-agent/pkg/logs/message" + "github.com/DataDog/datadog-agent/pkg/logs/metrics" +) + +// NewMockSender generates a mock sender +func NewMockSender() *Mock { + return &Mock{ + inChan: make(chan *message.Payload, 1), + monitor: metrics.NewNoopPipelineMonitor("mock_monitor"), + } +} + +// Mock represents a mocked sender that fulfills the pipeline component interface +type Mock struct { + inChan chan *message.Payload + monitor metrics.PipelineMonitor +} + +// In returns a self-emptying chan +func (s *Mock) In() chan *message.Payload { + return s.inChan +} + +// PipelineMonitor returns an instance of NoopPipelineMonitor +func (s *Mock) PipelineMonitor() metrics.PipelineMonitor { + return s.monitor +} + +// Start begins the routine that empties the In channel +func (s *Mock) Start() { + go func() { + for range s.inChan { //revive:disable-line:empty-block + } + }() +} + +// Stop closes the in channel +func (s *Mock) Stop() { + close(s.inChan) + +} diff --git a/pkg/logs/sender/stream_strategy.go b/pkg/logs/sender/stream_strategy.go index 32cee91a95039..3a5c1849f2cff 100644 --- a/pkg/logs/sender/stream_strategy.go +++ b/pkg/logs/sender/stream_strategy.go @@ -45,11 +45,13 @@ func (s *streamStrategy) Start() { return } + unencodedSize := len(msg.GetContent()) + msg.SetContent([]byte{}) s.outputChan <- &message.Payload{ Messages: []*message.Message{msg}, Encoded: encodedPayload, Encoding: s.compression.ContentEncoding(), - UnencodedSize: len(msg.GetContent()), + UnencodedSize: unencodedSize, } } s.done <- struct{}{} diff --git a/pkg/logs/sender/worker.go b/pkg/logs/sender/worker.go new file mode 100644 index 0000000000000..0e819ae0f6eec --- /dev/null +++ b/pkg/logs/sender/worker.go @@ -0,0 +1,281 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +package sender + +import ( + "strconv" + "sync" + "time" + + "github.com/DataDog/datadog-agent/comp/logs/agent/config" + pkgconfigmodel "github.com/DataDog/datadog-agent/pkg/config/model" + "github.com/DataDog/datadog-agent/pkg/logs/auditor" + "github.com/DataDog/datadog-agent/pkg/logs/client" + "github.com/DataDog/datadog-agent/pkg/logs/client/http" + "github.com/DataDog/datadog-agent/pkg/logs/client/tcp" + "github.com/DataDog/datadog-agent/pkg/logs/message" + "github.com/DataDog/datadog-agent/pkg/logs/metrics" + "github.com/DataDog/datadog-agent/pkg/logs/status/statusinterface" + "github.com/DataDog/datadog-agent/pkg/telemetry" +) + +var ( + tlmPayloadsDropped = telemetry.NewCounterWithOpts("logs_sender", "payloads_dropped", []string{"reliable", "destination"}, "Payloads dropped", telemetry.Options{DefaultMetric: true}) + tlmMessagesDropped = telemetry.NewCounterWithOpts("logs_sender", "messages_dropped", []string{"reliable", "destination"}, "Messages dropped", telemetry.Options{DefaultMetric: true}) + tlmSendWaitTime = telemetry.NewCounter("logs_sender", "send_wait", []string{}, "Time spent waiting for all sends to finish") +) + +// worker sends logs to different destinations. Destinations can be either +// reliable or unreliable. The worker ensures that logs are sent to at least +// one reliable destination and will block the pipeline if they are in an +// error state. Unreliable destinations will only send logs when at least +// one reliable destination is also sending logs. However they do not update +// the auditor or block the pipeline if they fail. There will always be at +// least 1 reliable destination (the main destination). +type worker struct { + auditor auditor.Auditor + config pkgconfigmodel.Reader + inputChan chan *message.Payload + outputChan chan *message.Payload + destinations *client.Destinations + done chan struct{} + finished chan struct{} + bufferSize int + senderDoneChan chan *sync.WaitGroup + flushWg *sync.WaitGroup + + pipelineMonitor metrics.PipelineMonitor + utilization metrics.UtilizationMonitor +} + +// newSenderWorker returns a new worker. +func newSenderWorker( + config pkgconfigmodel.Reader, + auditor auditor.Auditor, + bufferSize int, + senderDoneChan chan *sync.WaitGroup, + flushWg *sync.WaitGroup, + endpoints *config.Endpoints, + destinationsCtx *client.DestinationsContext, + status statusinterface.Status, + serverless bool, + componentName string, + contentType string, + minSenderConcurrency int, + maxSenderConcurrency int, + inputChan chan *message.Payload, + pipelineMonitor metrics.PipelineMonitor, +) *worker { + destinations := getDestinations( + endpoints, + destinationsCtx, + pipelineMonitor, + serverless, + senderDoneChan, + status, + config, + componentName, + contentType, + minSenderConcurrency, + maxSenderConcurrency, + ) + + return newWorkerWithDestinations(config, inputChan, auditor, destinations, bufferSize, senderDoneChan, flushWg, pipelineMonitor) +} + +func newWorkerWithDestinations( + config pkgconfigmodel.Reader, + inputChan chan *message.Payload, + auditor auditor.Auditor, + destinations *client.Destinations, + bufferSize int, + senderDoneChan chan *sync.WaitGroup, + flushWg *sync.WaitGroup, + pipelineMonitor metrics.PipelineMonitor, +) *worker { + return &worker{ + auditor: auditor, + config: config, + inputChan: inputChan, + destinations: destinations, + bufferSize: bufferSize, + senderDoneChan: senderDoneChan, + flushWg: flushWg, + done: make(chan struct{}), + finished: make(chan struct{}), + + // Telemetry + pipelineMonitor: pipelineMonitor, + utilization: pipelineMonitor.MakeUtilizationMonitor("sender"), + } +} + +// Start starts the worker. +func (s *worker) start() { + s.outputChan = s.auditor.Channel() + + go s.run() +} + +// Stop stops the worker, +// this call blocks until inputChan is flushed +func (s *worker) stop() { + s.done <- struct{}{} + <-s.finished +} + +func (s *worker) run() { + reliableDestinations := buildDestinationSenders(s.config, s.destinations.Reliable, s.outputChan, s.bufferSize) + + sink := additionalDestinationsSink(s.bufferSize) + unreliableDestinations := buildDestinationSenders(s.config, s.destinations.Unreliable, sink, s.bufferSize) + continueLoop := true + for continueLoop { + select { + case payload := <-s.inputChan: + s.utilization.Start() + var startInUse = time.Now() + senderDoneWg := &sync.WaitGroup{} + + sent := false + for !sent { + for _, destSender := range reliableDestinations { + if destSender.Send(payload) { + if destSender.destination.Metadata().ReportingEnabled { + s.pipelineMonitor.ReportComponentIngress(payload, destSender.destination.Metadata().MonitorTag()) + } + sent = true + if s.senderDoneChan != nil { + senderDoneWg.Add(1) + s.senderDoneChan <- senderDoneWg + } + } + } + + if !sent { + // Throttle the poll loop while waiting for a send to succeed + // This will only happen when all reliable destinations + // are blocked so logs have no where to go. + time.Sleep(100 * time.Millisecond) + } + } + + for i, destSender := range reliableDestinations { + // If an endpoint is stuck in the previous step, try to buffer the payloads if we have room to mitigate + // loss on intermittent failures. + if !destSender.lastSendSucceeded { + if !destSender.NonBlockingSend(payload) { + tlmPayloadsDropped.Inc("true", strconv.Itoa(i)) + tlmMessagesDropped.Add(float64(len(payload.Messages)), "true", strconv.Itoa(i)) + } + } + } + + // Attempt to send to unreliable destinations + for i, destSender := range unreliableDestinations { + if !destSender.NonBlockingSend(payload) { + tlmPayloadsDropped.Inc("false", strconv.Itoa(i)) + tlmMessagesDropped.Add(float64(len(payload.Messages)), "false", strconv.Itoa(i)) + if s.senderDoneChan != nil { + senderDoneWg.Add(1) + s.senderDoneChan <- senderDoneWg + } + } + } + + inUse := float64(time.Since(startInUse) / time.Millisecond) + tlmSendWaitTime.Add(inUse) + s.utilization.Stop() + + if s.senderDoneChan != nil && s.flushWg != nil { + // Wait for all destinations to finish sending the payload + senderDoneWg.Wait() + // Decrement the wait group when this payload has been sent + s.flushWg.Done() + } + s.pipelineMonitor.ReportComponentEgress(payload, "sender") + case <-s.done: + continueLoop = false + } + } + + // Cleanup the destinations + for _, destSender := range reliableDestinations { + destSender.Stop() + } + for _, destSender := range unreliableDestinations { + destSender.Stop() + } + close(sink) + s.finished <- struct{}{} +} + +// Drains the output channel from destinations that don't update the auditor. +func additionalDestinationsSink(bufferSize int) chan *message.Payload { + sink := make(chan *message.Payload, bufferSize) + go func() { + // drain channel, stop when channel is closed + //nolint:revive // TODO(AML) Fix revive linter + for range sink { + } + }() + return sink +} + +func buildDestinationSenders(config pkgconfigmodel.Reader, destinations []client.Destination, output chan *message.Payload, bufferSize int) []*DestinationSender { + destinationSenders := []*DestinationSender{} + for _, destination := range destinations { + destinationSenders = append(destinationSenders, NewDestinationSender(config, destination, output, bufferSize)) + } + return destinationSenders +} + +// GetDestinations returns configured destinations instances for the given endpoints. +func getDestinations( + endpoints *config.Endpoints, + destinationsContext *client.DestinationsContext, + pipelineMonitor metrics.PipelineMonitor, + serverless bool, + senderDoneChan chan *sync.WaitGroup, + status statusinterface.Status, + cfg pkgconfigmodel.Reader, + componentName string, + contentyType string, + minConcurrency int, + maxConcurrency int, +) *client.Destinations { + reliable := []client.Destination{} + additionals := []client.Destination{} + + if endpoints.UseHTTP { + for i, endpoint := range endpoints.GetReliableEndpoints() { + destMeta := client.NewDestinationMetadata(componentName, pipelineMonitor.ID(), "reliable", strconv.Itoa(i)) + if serverless { + reliable = append(reliable, http.NewSyncDestination(endpoint, contentyType, destinationsContext, senderDoneChan, destMeta, cfg)) + } else { + reliable = append(reliable, http.NewDestination(endpoint, contentyType, destinationsContext, true, destMeta, cfg, minConcurrency, maxConcurrency, pipelineMonitor)) + } + } + for i, endpoint := range endpoints.GetUnReliableEndpoints() { + destMeta := client.NewDestinationMetadata(componentName, pipelineMonitor.ID(), "unreliable", strconv.Itoa(i)) + if serverless { + additionals = append(additionals, http.NewSyncDestination(endpoint, contentyType, destinationsContext, senderDoneChan, destMeta, cfg)) + } else { + additionals = append(additionals, http.NewDestination(endpoint, contentyType, destinationsContext, false, destMeta, cfg, minConcurrency, maxConcurrency, pipelineMonitor)) + } + } + return client.NewDestinations(reliable, additionals) + } + + for _, endpoint := range endpoints.GetReliableEndpoints() { + reliable = append(reliable, tcp.NewDestination(endpoint, endpoints.UseProto, destinationsContext, !serverless, status)) + } + for _, endpoint := range endpoints.GetUnReliableEndpoints() { + additionals = append(additionals, tcp.NewDestination(endpoint, endpoints.UseProto, destinationsContext, false, status)) + } + + return client.NewDestinations(reliable, additionals) +} diff --git a/pkg/logs/sender/sender_test.go b/pkg/logs/sender/worker_test.go similarity index 69% rename from pkg/logs/sender/sender_test.go rename to pkg/logs/sender/worker_test.go index 4f35558a46974..60eaad05c00f0 100644 --- a/pkg/logs/sender/sender_test.go +++ b/pkg/logs/sender/worker_test.go @@ -22,6 +22,20 @@ import ( "github.com/DataDog/datadog-agent/pkg/logs/status/statusinterface" ) +type testAuditor struct { + output chan *message.Payload +} + +func (a *testAuditor) Channel() chan *message.Payload { + return a.output +} +func (a *testAuditor) Start() { +} +func (a *testAuditor) Stop() { +} +func (a *testAuditor) GetOffset(_ string) string { return "" } +func (a *testAuditor) GetTailingMode(_ string) string { return "" } + func newMessage(content []byte, source *sources.LogSource, status string) *message.Payload { return &message.Payload{ Messages: []*message.Message{message.NewMessageWithSource(content, status, source, 0)}, @@ -37,7 +51,9 @@ func TestSender(t *testing.T) { source := sources.NewLogSource("", &config.LogsConfig{}) input := make(chan *message.Payload, 1) - output := make(chan *message.Payload, 1) + auditor := &testAuditor{ + output: make(chan *message.Payload, 1), + } destinationsCtx := client.NewDestinationsContext() destinationsCtx.Start() @@ -46,19 +62,19 @@ func TestSender(t *testing.T) { destinations := client.NewDestinations([]client.Destination{destination}, nil) cfg := configmock.New(t) - sender := NewSender(cfg, input, output, destinations, 0, nil, nil, metrics.NewNoopPipelineMonitor("")) - sender.Start() + worker := newWorkerWithDestinations(cfg, input, auditor, destinations, 0, nil, nil, metrics.NewNoopPipelineMonitor("")) + worker.start() expectedMessage := newMessage([]byte("fake line"), source, "") // Write to the output should relay the message to the output (after sending it on the wire) input <- expectedMessage - message, ok := <-output + message, ok := <-auditor.output assert.True(t, ok) assert.Equal(t, message, expectedMessage) - sender.Stop() + worker.stop() destinationsCtx.Stop() } @@ -66,119 +82,127 @@ func TestSender(t *testing.T) { func TestSenderSingleDestination(t *testing.T) { cfg := configmock.New(t) input := make(chan *message.Payload, 1) - output := make(chan *message.Payload, 1) + auditor := &testAuditor{ + output: make(chan *message.Payload, 1), + } respondChan := make(chan int) - server := http.NewTestServerWithOptions(200, 0, true, respondChan, cfg) + server := http.NewTestServerWithOptions(200, 1, true, respondChan, cfg) destinations := client.NewDestinations([]client.Destination{server.Destination}, nil) - sender := NewSender(cfg, input, output, destinations, 10, nil, nil, metrics.NewNoopPipelineMonitor("")) - sender.Start() + worker := newWorkerWithDestinations(cfg, input, auditor, destinations, 10, nil, nil, metrics.NewNoopPipelineMonitor("")) + worker.start() input <- &message.Payload{} input <- &message.Payload{} <-respondChan - <-output + <-auditor.output <-respondChan - <-output + <-auditor.output server.Stop() - sender.Stop() + worker.stop() } //nolint:revive // TODO(AML) Fix revive linter func TestSenderDualReliableDestination(t *testing.T) { cfg := configmock.New(t) input := make(chan *message.Payload, 1) - output := make(chan *message.Payload, 1) + auditor := &testAuditor{ + output: make(chan *message.Payload, 1), + } respondChan1 := make(chan int) - server1 := http.NewTestServerWithOptions(200, 0, true, respondChan1, cfg) + server1 := http.NewTestServerWithOptions(200, 1, true, respondChan1, cfg) respondChan2 := make(chan int) - server2 := http.NewTestServerWithOptions(200, 0, true, respondChan2, cfg) + server2 := http.NewTestServerWithOptions(200, 1, true, respondChan2, cfg) destinations := client.NewDestinations([]client.Destination{server1.Destination, server2.Destination}, nil) - sender := NewSender(cfg, input, output, destinations, 10, nil, nil, metrics.NewNoopPipelineMonitor("")) - sender.Start() + worker := newWorkerWithDestinations(cfg, input, auditor, destinations, 10, nil, nil, metrics.NewNoopPipelineMonitor("")) + worker.start() input <- &message.Payload{} input <- &message.Payload{} <-respondChan1 <-respondChan2 - <-output - <-output + <-auditor.output + <-auditor.output <-respondChan1 <-respondChan2 - <-output - <-output + <-auditor.output + <-auditor.output server1.Stop() server2.Stop() - sender.Stop() + worker.stop() } //nolint:revive // TODO(AML) Fix revive linter func TestSenderUnreliableAdditionalDestination(t *testing.T) { cfg := configmock.New(t) input := make(chan *message.Payload, 1) - output := make(chan *message.Payload, 1) + auditor := &testAuditor{ + output: make(chan *message.Payload, 1), + } respondChan1 := make(chan int) - server1 := http.NewTestServerWithOptions(200, 0, true, respondChan1, cfg) + server1 := http.NewTestServerWithOptions(200, 1, true, respondChan1, cfg) respondChan2 := make(chan int) - server2 := http.NewTestServerWithOptions(200, 0, false, respondChan2, cfg) + server2 := http.NewTestServerWithOptions(200, 1, false, respondChan2, cfg) destinations := client.NewDestinations([]client.Destination{server1.Destination}, []client.Destination{server2.Destination}) - sender := NewSender(cfg, input, output, destinations, 10, nil, nil, metrics.NewNoopPipelineMonitor("")) - sender.Start() + worker := newWorkerWithDestinations(cfg, input, auditor, destinations, 10, nil, nil, metrics.NewNoopPipelineMonitor("")) + worker.start() input <- &message.Payload{} input <- &message.Payload{} <-respondChan1 <-respondChan2 - <-output + <-auditor.output <-respondChan1 <-respondChan2 - <-output + <-auditor.output server1.Stop() server2.Stop() - sender.Stop() + worker.stop() } func TestSenderUnreliableStopsWhenMainFails(t *testing.T) { cfg := configmock.New(t) input := make(chan *message.Payload, 1) - output := make(chan *message.Payload, 1) + auditor := &testAuditor{ + output: make(chan *message.Payload, 1), + } reliableRespond := make(chan int) - reliableServer := http.NewTestServerWithOptions(200, 0, true, reliableRespond, cfg) + reliableServer := http.NewTestServerWithOptions(200, 1, true, reliableRespond, cfg) unreliableRespond := make(chan int) - unreliableServer := http.NewTestServerWithOptions(200, 0, false, unreliableRespond, cfg) + unreliableServer := http.NewTestServerWithOptions(200, 1, false, unreliableRespond, cfg) destinations := client.NewDestinations([]client.Destination{reliableServer.Destination}, []client.Destination{unreliableServer.Destination}) - sender := NewSender(cfg, input, output, destinations, 10, nil, nil, metrics.NewNoopPipelineMonitor("")) - sender.Start() + worker := newWorkerWithDestinations(cfg, input, auditor, destinations, 10, nil, nil, metrics.NewNoopPipelineMonitor("")) + worker.start() input <- &message.Payload{} <-reliableRespond <-unreliableRespond - <-output + <-auditor.output reliableServer.ChangeStatus(500) @@ -203,32 +227,34 @@ func TestSenderUnreliableStopsWhenMainFails(t *testing.T) { reliableServer.Stop() unreliableServer.Stop() - sender.Stop() + worker.stop() } //nolint:revive // TODO(AML) Fix revive linter func TestSenderReliableContinuseWhenOneFails(t *testing.T) { cfg := configmock.New(t) input := make(chan *message.Payload, 1) - output := make(chan *message.Payload, 1) + auditor := &testAuditor{ + output: make(chan *message.Payload, 1), + } reliableRespond1 := make(chan int) - reliableServer1 := http.NewTestServerWithOptions(200, 0, true, reliableRespond1, cfg) + reliableServer1 := http.NewTestServerWithOptions(200, 1, true, reliableRespond1, cfg) reliableRespond2 := make(chan int) - reliableServer2 := http.NewTestServerWithOptions(200, 0, false, reliableRespond2, cfg) + reliableServer2 := http.NewTestServerWithOptions(200, 1, false, reliableRespond2, cfg) destinations := client.NewDestinations([]client.Destination{reliableServer1.Destination, reliableServer2.Destination}, nil) - sender := NewSender(cfg, input, output, destinations, 10, nil, nil, metrics.NewNoopPipelineMonitor("")) - sender.Start() + worker := newWorkerWithDestinations(cfg, input, auditor, destinations, 10, nil, nil, metrics.NewNoopPipelineMonitor("")) + worker.start() input <- &message.Payload{} <-reliableRespond1 <-reliableRespond2 - <-output - <-output + <-auditor.output + <-auditor.output reliableServer1.ChangeStatus(500) @@ -236,7 +262,7 @@ func TestSenderReliableContinuseWhenOneFails(t *testing.T) { <-reliableRespond1 // let it respond 500 once <-reliableRespond2 // Second endpoint gets the log line - <-output + <-auditor.output <-reliableRespond1 // its in a loop now, once we respond 500 a second time we know the sender has marked the endpoint as retrying // send another log @@ -245,36 +271,38 @@ func TestSenderReliableContinuseWhenOneFails(t *testing.T) { // reliable still stuck in retry loop - responding 500 over and over again. <-reliableRespond1 <-reliableRespond2 // Second output gets the line again - <-output + <-auditor.output reliableServer1.Stop() reliableServer2.Stop() - sender.Stop() + worker.stop() } //nolint:revive // TODO(AML) Fix revive linter func TestSenderReliableWhenOneFailsAndRecovers(t *testing.T) { cfg := configmock.New(t) input := make(chan *message.Payload, 1) - output := make(chan *message.Payload, 1) + auditor := &testAuditor{ + output: make(chan *message.Payload, 1), + } reliableRespond1 := make(chan int) - reliableServer1 := http.NewTestServerWithOptions(200, 0, true, reliableRespond1, cfg) + reliableServer1 := http.NewTestServerWithOptions(200, 1, true, reliableRespond1, cfg) reliableRespond2 := make(chan int) - reliableServer2 := http.NewTestServerWithOptions(200, 0, false, reliableRespond2, cfg) + reliableServer2 := http.NewTestServerWithOptions(200, 1, false, reliableRespond2, cfg) destinations := client.NewDestinations([]client.Destination{reliableServer1.Destination, reliableServer2.Destination}, nil) - sender := NewSender(cfg, input, output, destinations, 10, nil, nil, metrics.NewNoopPipelineMonitor("")) - sender.Start() + worker := newWorkerWithDestinations(cfg, input, auditor, destinations, 10, nil, nil, metrics.NewNoopPipelineMonitor("")) + worker.start() input <- &message.Payload{} <-reliableRespond1 <-reliableRespond2 - <-output - <-output + <-auditor.output + <-auditor.output reliableServer1.ChangeStatus(500) @@ -282,7 +310,7 @@ func TestSenderReliableWhenOneFailsAndRecovers(t *testing.T) { <-reliableRespond1 // let it respond 500 once <-reliableRespond2 // Second endpoint gets the log line - <-output + <-auditor.output <-reliableRespond1 // its in a loop now, once we respond 500 a second time we know the sender has marked the endpoint as retrying // send another log @@ -291,7 +319,7 @@ func TestSenderReliableWhenOneFailsAndRecovers(t *testing.T) { // reliable still stuck in retry loop - responding 500 over and over again. <-reliableRespond1 <-reliableRespond2 // Second output gets the line again - <-output + <-auditor.output // Recover the first server reliableServer1.ChangeStatus(200) @@ -303,17 +331,17 @@ func TestSenderReliableWhenOneFailsAndRecovers(t *testing.T) { } } - <-output // get the buffered log line that was stuck + <-auditor.output // get the buffered log line that was stuck // Make sure everything is unblocked input <- &message.Payload{} <-reliableRespond1 <-reliableRespond2 - <-output - <-output + <-auditor.output + <-auditor.output reliableServer1.Stop() reliableServer2.Stop() - sender.Stop() + worker.stop() } diff --git a/pkg/security/reporter/reporter.go b/pkg/security/reporter/reporter.go index a9fe090cff9c7..aaf8a6a204298 100644 --- a/pkg/security/reporter/reporter.go +++ b/pkg/security/reporter/reporter.go @@ -52,7 +52,19 @@ func newReporter(hostname string, stopper startstop.Stopper, sourceName, sourceT stopper.Add(auditor) // setup the pipeline provider that provides pairs of processor and sender - pipelineProvider := pipeline.NewProvider(4, auditor, &diagnostic.NoopMessageReceiver{}, nil, endpoints, context, &seccommon.NoopStatusProvider{}, hostnameimpl.NewHostnameService(), pkgconfigsetup.Datadog(), compression) + pipelineProvider := pipeline.NewLegacyProvider( + 4, + auditor, + &diagnostic.NoopMessageReceiver{}, + nil, + endpoints, + context, + &seccommon.NoopStatusProvider{}, + hostnameimpl.NewHostnameService(), + pkgconfigsetup.Datadog(), + compression, + false, + ) pipelineProvider.Start() stopper.Add(pipelineProvider) diff --git a/test/regression/cases/file_to_blackhole_100ms_latency_linear_load/datadog-agent/conf.d/disk-listener.d/conf.yaml b/test/regression/cases/file_to_blackhole_100ms_latency_linear_load/datadog-agent/conf.d/disk-listener.d/conf.yaml new file mode 100644 index 0000000000000..ec51a59de1c46 --- /dev/null +++ b/test/regression/cases/file_to_blackhole_100ms_latency_linear_load/datadog-agent/conf.d/disk-listener.d/conf.yaml @@ -0,0 +1,5 @@ +logs: + - type: file + path: "/smp-shared/*.log" + service: "my-service" + source: "my-client-app" diff --git a/test/regression/cases/file_to_blackhole_100ms_latency_linear_load/datadog-agent/datadog.yaml b/test/regression/cases/file_to_blackhole_100ms_latency_linear_load/datadog-agent/datadog.yaml new file mode 100644 index 0000000000000..efe55170e4dc9 --- /dev/null +++ b/test/regression/cases/file_to_blackhole_100ms_latency_linear_load/datadog-agent/datadog.yaml @@ -0,0 +1,21 @@ +auth_token_file_path: /tmp/agent-auth-token + +# Disable cloud detection. This stops the Agent from poking around the +# execution environment & network. This is particularly important if the target +# has network access. +cloud_provider_metadata: [] + +dd_url: http://127.0.0.1:9091 + +logs_enabled: true +logs_config: + logs_dd_url: 127.0.0.1:9092 + http_protocol: auto + file_scan_period: 1 + logs_no_ssl: true + force_use_http: true + +process_config.process_dd_url: http://localhost:9093 + +telemetry.enabled: true +telemetry.checks: '*' diff --git a/test/regression/cases/file_to_blackhole_100ms_latency_linear_load/experiment.yaml b/test/regression/cases/file_to_blackhole_100ms_latency_linear_load/experiment.yaml new file mode 100644 index 0000000000000..de233425360ae --- /dev/null +++ b/test/regression/cases/file_to_blackhole_100ms_latency_linear_load/experiment.yaml @@ -0,0 +1,32 @@ +optimization_goal: egress_throughput +erratic: false + +target: + name: datadog-agent + cpu_allotment: 4 + memory_allotment: 2GiB + + environment: + DD_API_KEY: 00000001 + DD_HOSTNAME: smp-regression + + profiling_environment: + DD_INTERNAL_PROFILING_BLOCK_PROFILE_RATE: 10000 + DD_INTERNAL_PROFILING_CPU_DURATION: 1m + DD_INTERNAL_PROFILING_DELTA_PROFILES: true + DD_INTERNAL_PROFILING_ENABLED: true + DD_INTERNAL_PROFILING_ENABLE_GOROUTINE_STACKTRACES: true + DD_INTERNAL_PROFILING_MUTEX_PROFILE_FRACTION: 10 + DD_INTERNAL_PROFILING_PERIOD: 1m + DD_INTERNAL_PROFILING_UNIX_SOCKET: /smp-host/apm.socket + DD_PROFILING_EXECUTION_TRACE_ENABLED: true + DD_PROFILING_EXECUTION_TRACE_PERIOD: 1m + DD_PROFILING_WAIT_PROFILE: true + +checks: + - name: memory_usage + description: "Memory usage" + bounds: + series: total_rss_bytes + # The machine has 12GiB free. + upper_bound: 1.2GiB diff --git a/test/regression/cases/file_to_blackhole_100ms_latency_linear_load/lading/lading.yaml b/test/regression/cases/file_to_blackhole_100ms_latency_linear_load/lading/lading.yaml new file mode 100644 index 0000000000000..274b94defcd8a --- /dev/null +++ b/test/regression/cases/file_to_blackhole_100ms_latency_linear_load/lading/lading.yaml @@ -0,0 +1,34 @@ +generator: + - file_gen: + logrotate_fs: + seed: [2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 41, 43, 47, 53, + 59, 61, 67, 71, 73, 79, 83, 89, 97, 101, 103, 107, 109, 113, 127, 131] + load_profile: + linear: + # Over a ten minute experiment this will mean load proceeds from + # 10MB to 310MB per second. As a file rotates every 500MB we will + # see files rotating, by the end, every two seconds. + # + # Agent is not expected to keep up. + initial_bytes_per_second: 10MB + rate: 0.5MB + concurrent_logs: 1 + maximum_bytes_per_log: 500MB + total_rotations: 5 + max_depth: 0 + variant: "ascii" + maximum_prebuild_cache_size_bytes: 300MB + mount_point: /smp-shared + +blackhole: + - http: + binding_addr: "127.0.0.1:9091" + - http: + binding_addr: "127.0.0.1:9092" + response_delay_millis: 100 + - http: + binding_addr: "127.0.0.1:9093" + +target_metrics: + - prometheus: + uri: "http://127.0.0.1:5000/telemetry" diff --git a/test/regression/cases/file_to_blackhole_100ms_latency_multifile/datadog-agent/conf.d/disk-listener.d/conf.yaml b/test/regression/cases/file_to_blackhole_100ms_latency_multifile/datadog-agent/conf.d/disk-listener.d/conf.yaml new file mode 100644 index 0000000000000..ec51a59de1c46 --- /dev/null +++ b/test/regression/cases/file_to_blackhole_100ms_latency_multifile/datadog-agent/conf.d/disk-listener.d/conf.yaml @@ -0,0 +1,5 @@ +logs: + - type: file + path: "/smp-shared/*.log" + service: "my-service" + source: "my-client-app" diff --git a/test/regression/cases/file_to_blackhole_100ms_latency_multifile/datadog-agent/datadog.yaml b/test/regression/cases/file_to_blackhole_100ms_latency_multifile/datadog-agent/datadog.yaml new file mode 100644 index 0000000000000..fc6d0dc948e64 --- /dev/null +++ b/test/regression/cases/file_to_blackhole_100ms_latency_multifile/datadog-agent/datadog.yaml @@ -0,0 +1,20 @@ +auth_token_file_path: /tmp/agent-auth-token + +# Disable cloud detection. This stops the Agent from poking around the +# execution environment & network. This is particularly important if the target +# has network access. +cloud_provider_metadata: [] + +dd_url: http://127.0.0.1:9091 + +logs_enabled: true +logs_config: + logs_dd_url: 127.0.0.1:9092 + file_scan_period: 1 + logs_no_ssl: true + force_use_http: true + +process_config.process_dd_url: http://localhost:9093 + +telemetry.enabled: true +telemetry.checks: '*' diff --git a/test/regression/cases/file_to_blackhole_100ms_latency_multifile/experiment.yaml b/test/regression/cases/file_to_blackhole_100ms_latency_multifile/experiment.yaml new file mode 100644 index 0000000000000..d828308511656 --- /dev/null +++ b/test/regression/cases/file_to_blackhole_100ms_latency_multifile/experiment.yaml @@ -0,0 +1,39 @@ +optimization_goal: egress_throughput +erratic: false + +target: + name: datadog-agent + cpu_allotment: 4 + memory_allotment: 2GiB + + environment: + DD_API_KEY: 00000001 + DD_HOSTNAME: smp-regression + + profiling_environment: + DD_INTERNAL_PROFILING_BLOCK_PROFILE_RATE: 10000 + DD_INTERNAL_PROFILING_CPU_DURATION: 1m + DD_INTERNAL_PROFILING_DELTA_PROFILES: true + DD_INTERNAL_PROFILING_ENABLED: true + DD_INTERNAL_PROFILING_ENABLE_GOROUTINE_STACKTRACES: true + DD_INTERNAL_PROFILING_MUTEX_PROFILE_FRACTION: 10 + DD_INTERNAL_PROFILING_PERIOD: 1m + DD_INTERNAL_PROFILING_UNIX_SOCKET: /smp-host/apm.socket + DD_PROFILING_EXECUTION_TRACE_ENABLED: true + DD_PROFILING_EXECUTION_TRACE_PERIOD: 1m + DD_PROFILING_WAIT_PROFILE: true + +checks: + - name: memory_usage + description: "Memory usage" + bounds: + series: total_rss_bytes + # The machine has 12GiB free. + upper_bound: 1.2GiB + + # Temporarily disable as we expect this to be failing, but not for long. + # - name: lost_bytes + # description: "Allowable bytes not polled by log Agent" + # bounds: + # series: lost_bytes + # upper_bound: 0KiB diff --git a/test/regression/cases/file_to_blackhole_100ms_latency_multifile/lading/lading.yaml b/test/regression/cases/file_to_blackhole_100ms_latency_multifile/lading/lading.yaml new file mode 100644 index 0000000000000..9831ff011c782 --- /dev/null +++ b/test/regression/cases/file_to_blackhole_100ms_latency_multifile/lading/lading.yaml @@ -0,0 +1,27 @@ +generator: + - file_gen: + logrotate_fs: + seed: [2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 41, 43, 47, 53, + 59, 61, 67, 71, 73, 79, 83, 89, 97, 101, 103, 107, 109, 113, 127, 131] + load_profile: + constant: 2MB + concurrent_logs: 12 + maximum_bytes_per_log: 500MB + total_rotations: 5 + max_depth: 0 + variant: "ascii" + maximum_prebuild_cache_size_bytes: 300MB + mount_point: /smp-shared + +blackhole: + - http: + binding_addr: "127.0.0.1:9091" + - http: + binding_addr: "127.0.0.1:9092" + response_delay_millis: 1000 + - http: + binding_addr: "127.0.0.1:9093" + +target_metrics: + - prometheus: + uri: "http://127.0.0.1:5000/telemetry"