Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Distributed Sender Draft #34648

Draft
wants to merge 9 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 21 additions & 16 deletions comp/forwarder/eventplatform/eventplatformimpl/epforwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,19 +415,25 @@ func newHTTPPassthroughPipeline(

pipelineMonitor := metrics.NewNoopPipelineMonitor(strconv.Itoa(pipelineID))

reliable := []client.Destination{}
for i, endpoint := range endpoints.GetReliableEndpoints() {
destMeta := client.NewDestinationMetadata(desc.eventType, pipelineMonitor.ID(), "reliable", strconv.Itoa(i))
reliable = append(reliable, logshttp.NewDestination(endpoint, desc.contentType, destinationsContext, endpoints.BatchMaxConcurrentSend, true, destMeta, pkgconfigsetup.Datadog(), pipelineMonitor))
}
additionals := []client.Destination{}
for i, endpoint := range endpoints.GetUnReliableEndpoints() {
destMeta := client.NewDestinationMetadata(desc.eventType, pipelineMonitor.ID(), "unreliable", strconv.Itoa(i))
additionals = append(additionals, logshttp.NewDestination(endpoint, desc.contentType, destinationsContext, endpoints.BatchMaxConcurrentSend, false, destMeta, pkgconfigsetup.Datadog(), pipelineMonitor))
}
destinations := client.NewDestinations(reliable, additionals)
inputChan := make(chan *message.Message, endpoints.InputChanSize)
senderInput := make(chan *message.Payload, 1) // Only buffer 1 message since payloads can be large

a := auditor.NewNullAuditor()
senderImpl := sender.NewSender(
coreConfig,
a,
10, // Buffer Size
nil, // senderDoneChan, required only for serverless
nil, // flushWg, required only for serverless
endpoints,
destinationsContext,
nil, // status, required only for tcp endpoint support
false,
desc.eventType,
desc.contentType,
sender.DefaultWorkerCount,
endpoints.BatchMaxConcurrentSend,
endpoints.BatchMaxConcurrentSend,
)

var encoder compressioncommon.Compressor
encoder = compressor.NewCompressor("none", 0)
Expand All @@ -437,10 +443,10 @@ func newHTTPPassthroughPipeline(

var strategy sender.Strategy
if desc.contentType == logshttp.ProtobufContentType {
strategy = sender.NewStreamStrategy(inputChan, senderInput, encoder)
strategy = sender.NewStreamStrategy(inputChan, senderImpl.In(), encoder)
} else {
strategy = sender.NewBatchStrategy(inputChan,
senderInput,
senderImpl.In(),
make(chan struct{}),
false,
nil,
Expand All @@ -453,11 +459,10 @@ func newHTTPPassthroughPipeline(
pipelineMonitor)
}

a := auditor.NewNullAuditor()
log.Debugf("Initialized event platform forwarder pipeline. eventType=%s mainHosts=%s additionalHosts=%s batch_max_concurrent_send=%d batch_max_content_size=%d batch_max_size=%d, input_chan_size=%d",
desc.eventType, joinHosts(endpoints.GetReliableEndpoints()), joinHosts(endpoints.GetUnReliableEndpoints()), endpoints.BatchMaxConcurrentSend, endpoints.BatchMaxContentSize, endpoints.BatchMaxSize, endpoints.InputChanSize)
return &passthroughPipeline{
sender: sender.NewSender(coreConfig, senderInput, a.Channel(), destinations, 10, nil, nil, pipelineMonitor),
sender: senderImpl,
strategy: strategy,
in: inputChan,
auditor: a,
Expand Down
5 changes: 3 additions & 2 deletions comp/logs/agent/agentimpl/agent_core_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@ func (a *logAgent) SetupPipeline(processingRules []*config.ProcessingRule, wmeta
diagnosticMessageReceiver := diagnostic.NewBufferedMessageReceiver(nil, a.hostname)

// setup the pipeline provider that provides pairs of processor and sender
pipelineProvider := pipeline.NewProvider(a.config.GetInt("logs_config.pipelines"), auditor, diagnosticMessageReceiver, processingRules, a.endpoints, destinationsCtx, NewStatusProvider(), a.hostname, a.config, a.compression)
pipelineProvider := pipeline.NewProvider(a.config.GetInt("logs_config.pipelines"),
auditor, diagnosticMessageReceiver, processingRules, a.endpoints, destinationsCtx,
NewStatusProvider(), a.hostname, a.config, a.compression)

// setup the launchers
lnchrs := launchers.NewLaunchers(a.sources, pipelineProvider, auditor, a.tracker)
Expand Down Expand Up @@ -78,7 +80,6 @@ func (a *logAgent) SetupPipeline(processingRules []*config.ProcessingRule, wmeta
a.launchers = lnchrs
a.health = health
a.diagnosticMessageReceiver = diagnosticMessageReceiver

}

// buildEndpoints builds endpoints for the logs agent
Expand Down
14 changes: 13 additions & 1 deletion comp/logs/agent/agentimpl/agent_serverless_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,19 @@ func (a *logAgent) SetupPipeline(
destinationsCtx := client.NewDestinationsContext()

// setup the pipeline provider that provides pairs of processor and sender
pipelineProvider := pipeline.NewServerlessProvider(a.config.GetInt("logs_config.pipelines"), a.auditor, diagnosticMessageReceiver, processingRules, a.endpoints, destinationsCtx, NewStatusProvider(), a.hostname, a.config, a.compression)
pipelineProvider := pipeline.NewLegacyProvider(
a.config.GetInt("logs_config.pipelines"),
a.auditor,
diagnosticMessageReceiver,
processingRules,
a.endpoints,
destinationsCtx,
NewStatusProvider(),
a.hostname,
a.config,
a.compression,
true,
)

lnchrs := launchers.NewLaunchers(a.sources, pipelineProvider, a.auditor, a.tracker)
lnchrs.AddLauncher(channel.NewLauncher())
Expand Down
5 changes: 4 additions & 1 deletion comp/logs/agent/agentimpl/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ package agentimpl
import (
"bytes"
"context"
"expvar"
"fmt"
"os"
"strings"
Expand Down Expand Up @@ -158,7 +159,9 @@ func (suite *AgentTestSuite) testAgent(endpoints *config.Endpoints) {
assert.Equal(suite.T(), zero, metrics.LogsProcessed.Value())
assert.Equal(suite.T(), zero, metrics.LogsSent.Value())
assert.Equal(suite.T(), zero, metrics.DestinationErrors.Value())
assert.Equal(suite.T(), "{}", metrics.DestinationLogsDropped.String())
metrics.DestinationLogsDropped.Do(func(k expvar.KeyValue) {
assert.Equal(suite.T(), k.Value.String(), "0")
})

agent.startPipeline()
sources.AddSource(suite.source)
Expand Down
5 changes: 3 additions & 2 deletions comp/logs/agent/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,12 @@ import (
"testing"
"time"

"github.com/DataDog/datadog-agent/comp/core/config"
pkgconfigsetup "github.com/DataDog/datadog-agent/pkg/config/setup"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"
"go.uber.org/atomic"

"github.com/DataDog/datadog-agent/comp/core/config"
pkgconfigsetup "github.com/DataDog/datadog-agent/pkg/config/setup"
)

type ConfigTestSuite struct {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,9 @@ func (a *Agent) SetupPipeline(
destinationsCtx := client.NewDestinationsContext()

// setup the pipeline provider that provides pairs of processor and sender
pipelineProvider := pipeline.NewProvider(a.config.GetInt("logs_config.pipelines"), auditor, &diagnostic.NoopMessageReceiver{}, processingRules, a.endpoints, destinationsCtx, NewStatusProvider(), a.hostname, a.config, a.compression)
pipelineProvider := pipeline.NewProvider(a.config.GetInt("logs_config.pipelines"), auditor,
&diagnostic.NoopMessageReceiver{}, processingRules, a.endpoints,
destinationsCtx, NewStatusProvider(), a.hostname, a.config, a.compression)

a.auditor = auditor
a.destinationsCtx = destinationsCtx
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package logsagentpipelineimpl

import (
"context"
"expvar"
"testing"
"time"

Expand Down Expand Up @@ -97,7 +98,9 @@ func (suite *AgentTestSuite) testAgent(endpoints *config.Endpoints) {
assert.Equal(suite.T(), zero, metrics.LogsProcessed.Value())
assert.Equal(suite.T(), zero, metrics.LogsSent.Value())
assert.Equal(suite.T(), zero, metrics.DestinationErrors.Value())
assert.Equal(suite.T(), "{}", metrics.DestinationLogsDropped.String())
metrics.DestinationLogsDropped.Do(func(k expvar.KeyValue) {
assert.Equal(suite.T(), k.Value.String(), "0")
})

agent.startPipeline()
suite.sendTestMessages(agent)
Expand Down
1 change: 0 additions & 1 deletion comp/otelcol/otlp/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ func TestGetComponents(t *testing.T) {

func AssertSucessfulRun(t *testing.T, pcfg PipelineConfig) {
fakeTagger := mock.SetupFakeTagger(t)

p, err := NewPipeline(pcfg, serializermock.NewMetricSerializer(t), make(chan *message.Message), fakeTagger)
require.NoError(t, err)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
Expand Down
14 changes: 13 additions & 1 deletion pkg/compliance/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,19 @@ func NewLogReporter(hostname string, sourceName, sourceType string, endpoints *c
auditor.Start()

// setup the pipeline provider that provides pairs of processor and sender
pipelineProvider := pipeline.NewProvider(4, auditor, &diagnostic.NoopMessageReceiver{}, nil, endpoints, dstcontext, &common.NoopStatusProvider{}, hostnameimpl.NewHostnameService(), pkgconfigsetup.Datadog(), compression)
pipelineProvider := pipeline.NewLegacyProvider(
4,
auditor,
&diagnostic.NoopMessageReceiver{},
nil,
endpoints,
dstcontext,
&common.NoopStatusProvider{},
hostnameimpl.NewHostnameService(),
pkgconfigsetup.Datadog(),
compression,
false,
)
pipelineProvider.Start()

logSource := sources.NewLogSource(
Expand Down
55 changes: 32 additions & 23 deletions pkg/logs/client/http/destination.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ var (
//nolint:revive // TODO(AML) Fix revive linter
var emptyJsonPayload = message.Payload{Messages: []*message.Message{}, Encoded: []byte("{}")}

type destinationResult struct {
latency time.Duration
err error
}

// Destination sends a payload over HTTP.
type Destination struct {
// Config
Expand All @@ -70,8 +75,8 @@ type Destination struct {
isMRF bool

// Concurrency
climit chan struct{} // semaphore for limiting concurrent background sends
wg sync.WaitGroup
senderPool SenderPool
wg sync.WaitGroup

// Retry
backoff backoff.Policy
Expand All @@ -94,36 +99,36 @@ type Destination struct {
func NewDestination(endpoint config.Endpoint,
contentType string,
destinationsContext *client.DestinationsContext,
maxConcurrentBackgroundSends int,
shouldRetry bool,
destMeta *client.DestinationMetadata,
cfg pkgconfigmodel.Reader,
minConcurrency int,
maxConcurrency int,
pipelineMonitor metrics.PipelineMonitor) *Destination {

return newDestination(endpoint,
contentType,
destinationsContext,
time.Second*10,
maxConcurrentBackgroundSends,
shouldRetry,
destMeta,
cfg,
minConcurrency,
maxConcurrency,
pipelineMonitor)
}

func newDestination(endpoint config.Endpoint,
contentType string,
destinationsContext *client.DestinationsContext,
timeout time.Duration,
maxConcurrentBackgroundSends int,
shouldRetry bool,
destMeta *client.DestinationMetadata,
cfg pkgconfigmodel.Reader,
minConcurrency int,
maxConcurrency int,
pipelineMonitor metrics.PipelineMonitor) *Destination {

if maxConcurrentBackgroundSends <= 0 {
maxConcurrentBackgroundSends = 1
}
policy := backoff.NewExpBackoffPolicy(
endpoint.BackoffFactor,
endpoint.BackoffBase,
Expand All @@ -140,14 +145,16 @@ func newDestination(endpoint config.Endpoint,
metrics.DestinationExpVars.Set(destMeta.TelemetryName(), expVars)
}

senderPool := NewSenderPool(minConcurrency, maxConcurrency, destMeta)

return &Destination{
host: endpoint.Host,
url: buildURL(endpoint),
endpoint: endpoint,
contentType: contentType,
client: httputils.NewResetClient(endpoint.ConnectionResetInterval, httpClientFactory(timeout, cfg)),
destinationsContext: destinationsContext,
climit: make(chan struct{}, maxConcurrentBackgroundSends),
senderPool: senderPool,
wg: sync.WaitGroup{},
backoff: policy,
protocol: endpoint.Protocol,
Expand Down Expand Up @@ -222,20 +229,16 @@ func (d *Destination) run(input chan *message.Payload, output chan *message.Payl

func (d *Destination) sendConcurrent(payload *message.Payload, output chan *message.Payload, isRetrying chan bool) {
d.wg.Add(1)
d.climit <- struct{}{}
go func() {
defer func() {
<-d.climit
d.wg.Done()
}()
d.sendAndRetry(payload, output, isRetrying)
}()
d.senderPool.Run(func() destinationResult {
result := d.sendAndRetry(payload, output, isRetrying)
d.wg.Done()
return result
})
}

// Send sends a payload over HTTP,
func (d *Destination) sendAndRetry(payload *message.Payload, output chan *message.Payload, isRetrying chan bool) {
func (d *Destination) sendAndRetry(payload *message.Payload, output chan *message.Payload, isRetrying chan bool) destinationResult {
for {

d.retryLock.Lock()
nbErrors := d.nbErrors
d.retryLock.Unlock()
Expand All @@ -249,7 +252,13 @@ func (d *Destination) sendAndRetry(payload *message.Payload, output chan *messag
metrics.TlmRetryCount.Add(1)
}

start := time.Now()
err := d.unconditionalSend(payload)
latency := time.Since(start)
result := destinationResult{
latency: latency,
err: err,
}

if err != nil {
metrics.DestinationErrors.Add(1)
Expand All @@ -265,7 +274,7 @@ func (d *Destination) sendAndRetry(payload *message.Payload, output chan *messag

if err == context.Canceled {
d.updateRetryState(nil, isRetrying)
return
return result
}

if d.shouldRetry {
Expand All @@ -277,7 +286,7 @@ func (d *Destination) sendAndRetry(payload *message.Payload, output chan *messag
metrics.LogsSent.Add(int64(len(payload.Messages)))
metrics.TlmLogsSent.Add(float64(len(payload.Messages)))
output <- payload
return
return result
}
}

Expand Down Expand Up @@ -322,7 +331,6 @@ func (d *Destination) unconditionalSend(payload *message.Payload) (err error) {

req = req.WithContext(ctx)
resp, err := d.client.Do(req)

latency := time.Since(then).Milliseconds()
metrics.TlmSenderLatency.Observe(float64(latency))
metrics.SenderLatency.Set(latency)
Expand Down Expand Up @@ -460,7 +468,8 @@ func getMessageTimestamp(messages []*message.Message) int64 {
func prepareCheckConnectivity(endpoint config.Endpoint, cfg pkgconfigmodel.Reader) (*client.DestinationsContext, *Destination) {
ctx := client.NewDestinationsContext()
// Lower the timeout to 5s because HTTP connectivity test is done synchronously during the agent bootstrap sequence
destination := newDestination(endpoint, JSONContentType, ctx, time.Second*5, 0, false, client.NewNoopDestinationMetadata(), cfg, metrics.NewNoopPipelineMonitor(""))
destination := newDestination(endpoint, JSONContentType, ctx, time.Second*5, false, client.NewNoopDestinationMetadata(), cfg, 1, 1, metrics.NewNoopPipelineMonitor(""))

return ctx, destination
}

Expand Down
Loading
Loading