Skip to content

Commit

Permalink
[WIP] Fix panic when more than 32767 clients are active
Browse files Browse the repository at this point in the history
  • Loading branch information
belimawr committed Mar 22, 2024
1 parent 62235ef commit 4210d81
Show file tree
Hide file tree
Showing 13 changed files with 126 additions and 103 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ func startHarvester(
defer releaseResource(resource)

client, err := hg.pipeline.ConnectWith(beat.ClientConfig{
CloseRef: ctx.Cancelation,
// CloseRef: ctx.Cancelation,
EventListener: newInputACKHandler(hg.ackCH),
})
if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion filebeat/input/kafka/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,12 +120,13 @@ func (input *kafkaInput) Run(ctx input.Context, pipeline beat.Pipeline) error {
}
}),
),
CloseRef: ctx.Cancelation,
// CloseRef: ctx.Cancelation,
WaitClose: input.config.WaitClose,
})
if err != nil {
return err
}
defer client.Close()

log.Info("Starting Kafka input")
defer log.Info("Kafka input stopped")
Expand Down
2 changes: 1 addition & 1 deletion filebeat/input/v2/input-cursor/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func (inp *managedInput) runSource(
}()

client, err := pipeline.ConnectWith(beat.ClientConfig{
CloseRef: ctx.Cancelation,
// CloseRef: ctx.Cancelation,
EventListener: newInputACKHandler(ctx.Logger),
})
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion filebeat/input/v2/input-stateless/stateless.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (si configuredInput) Run(ctx v2.Context, pipeline beat.PipelineConnector) (
PublishMode: beat.DefaultGuarantees,

// configure pipeline to disconnect input on stop signal.
CloseRef: ctx.Cancelation,
// CloseRef: ctx.Cancelation,
})
if err != nil {
return err
Expand Down
6 changes: 4 additions & 2 deletions libbeat/publisher/pipeline/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ type client struct {
isOpen atomic.Bool // set to false during shutdown, such that no new events will be accepted anymore.
closeOnce sync.Once // closeOnce ensure that the client shutdown sequence is only executed once
closeRef beat.CloseRef // extern closeRef for sending a signal that the client should be closed.

Check failure on line 47 in libbeat/publisher/pipeline/client.go

View workflow job for this annotation

GitHub Actions / lint (windows)

field `closeRef` is unused (unused)
done chan struct{} // the done channel will be closed if the closeReg gets closed, or Close is run.
// done chan struct{} // the done channel will be closed if the closeReg gets closed, or Close is run.

observer observer
eventListener beat.EventListener
Expand Down Expand Up @@ -137,7 +137,9 @@ func (c *client) Close() error {
// first stop ack handling. ACK handler might block on wait (with timeout), waiting
// for pending events to be ACKed.
c.closeOnce.Do(func() {
close(c.done)
// This is not needed any more as the pipeline does not
// keep any list of clients
// close(c.done)

c.isOpen.Store(false)
c.onClosing()
Expand Down
95 changes: 4 additions & 91 deletions libbeat/publisher/pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ package pipeline

import (
"fmt"
"reflect"
"sync"
"time"

Expand Down Expand Up @@ -197,9 +196,6 @@ func (p *Pipeline) Close() error {
p.outputController.Close()

p.observer.cleanup()
if p.sigNewClient != nil {
close(p.sigNewClient)
}
return nil
}

Expand All @@ -212,6 +208,8 @@ func (p *Pipeline) Connect() (beat.Client, error) {
// The client behavior on close and ACK handling can be configured by setting
// the appropriate fields in the passed ClientConfig.
// If not set otherwise the defaut publish mode is OutputChooses.
//
// It is responsibility of the caller to close the client.
func (p *Pipeline) ConnectWith(cfg beat.ClientConfig) (beat.Client, error) {
var (
canDrop bool
Expand All @@ -238,9 +236,8 @@ func (p *Pipeline) ConnectWith(cfg beat.ClientConfig) (beat.Client, error) {
}

client := &client{
logger: p.monitors.Logger,
closeRef: cfg.CloseRef,
done: make(chan struct{}),
logger: p.monitors.Logger,
// closeRef: cfg.CloseRef,
isOpen: atomic.MakeBool(true),
clientListener: cfg.ClientListener,
processors: processors,
Expand Down Expand Up @@ -295,93 +292,9 @@ func (p *Pipeline) ConnectWith(cfg beat.ClientConfig) (beat.Client, error) {
}

p.observer.clientConnected()

if client.closeRef != nil {
p.registerSignalPropagation(client)
}

return client, nil
}

func (p *Pipeline) registerSignalPropagation(c *client) {
p.guardStartSigPropagation.Do(func() {
p.sigNewClient = make(chan *client, 1)
go p.runSignalPropagation()
})
p.sigNewClient <- c
}

func (p *Pipeline) runSignalPropagation() {
var channels []reflect.SelectCase
var clients []*client

channels = append(channels, reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(p.sigNewClient),
})

for {
chosen, recv, recvOK := reflect.Select(channels)
if chosen == 0 {
if !recvOK {
// sigNewClient was closed
return
}

// new client -> register client for signal propagation.
if client := recv.Interface().(*client); client != nil {
channels = append(channels,
reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(client.closeRef.Done()),
},
reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(client.done),
},
)
clients = append(clients, client)
}
continue
}

// find client we received a signal for. If client.done was closed, then
// we have to remove the client only. But if closeRef did trigger the signal, then
// we have to propagate the async close to the client.
// In either case, the client will be removed

i := (chosen - 1) / 2
isSig := (chosen & 1) == 1
if isSig {
client := clients[i]
client.Close()
}

// remove:
last := len(clients) - 1
ch1 := i*2 + 1
ch2 := ch1 + 1
lastCh1 := last*2 + 1
lastCh2 := lastCh1 + 1

clients[i], clients[last] = clients[last], nil
channels[ch1], channels[lastCh1] = channels[lastCh1], reflect.SelectCase{}
channels[ch2], channels[lastCh2] = channels[lastCh2], reflect.SelectCase{}

clients = clients[:last]
channels = channels[:lastCh1]
if cap(clients) > 10 && len(clients) <= cap(clients)/2 {
clientsTmp := make([]*client, len(clients))
copy(clientsTmp, clients)
clients = clientsTmp

channelsTmp := make([]reflect.SelectCase, len(channels))
copy(channelsTmp, channels)
channels = channelsTmp
}
}
}

func (p *Pipeline) createEventProcessing(cfg beat.ProcessingConfig, noPublish bool) (beat.Processor, error) {
if p.processors == nil {
return nil, nil
Expand Down
101 changes: 101 additions & 0 deletions libbeat/publisher/pipeline/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,113 @@
package pipeline

import (
"context"
"runtime"
"sync"
"testing"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common/atomic"
"github.com/elastic/beats/v7/libbeat/publisher/queue"
"github.com/elastic/beats/v7/libbeat/tests/resources"
"github.com/elastic/elastic-agent-libs/mapstr"
)

func TestPipelineAcceptsAnyNumberOfClients(t *testing.T) {
if testing.Short() {
t.Skip("Skipping because short is enabled")
}

routinesChecker := resources.NewGoroutinesChecker()
defer routinesChecker.Check(t)

pipeline := makePipeline(t, Settings{}, makeDiscardQueue())

defer pipeline.Close()
ctx, cancel := context.WithCancel(context.Background())

n := 20 //35000 // This needs to be more than 10
clients := []beat.Client{}
for i := 0; i < n; i++ {
c, err := pipeline.ConnectWith(beat.ClientConfig{
CloseRef: ctx,
})
if err != nil {
t.Fatalf("Could not connect to pipeline: %s", err)
}
clients = append(clients, c)
}

for i, c := range clients {
c.Publish(beat.Event{
Fields: mapstr.M{
"count": i,
},
})
}

// Close the first 105 clients
nn := 6
tmpC := clients[:n]
clients = clients[nn:]

for _, c := range tmpC {
c.Close()
}
runtime.Gosched()
runtime.Gosched()

cancel()

// Make sure all clients are closed
for _, c := range clients {
c.Close()
}
}

// makeDiscardQueue returns a queue that always discards all events
// the producers are assigned an unique incremental ID, when their
// close method is called, this ID is returned
func makeDiscardQueue() queue.Queue {
var wg sync.WaitGroup
producerID := atomic.NewInt(0)

return &testQueue{
close: func() error {
// Wait for all producers to finish
wg.Wait()
return nil
},
get: func(count int) (queue.Batch, error) {
return nil, nil
},

producer: func(cfg queue.ProducerConfig) queue.Producer {
producerID.Inc()
id := producerID.Load()

// count is a counter that increments on every published event
// it's also the returned Event ID
count := uint64(0)
var producer *testProducer

Check failure on line 109 in libbeat/publisher/pipeline/pipeline_test.go

View workflow job for this annotation

GitHub Actions / lint (windows)

S1021: should merge variable declaration with assignment on next line (gosimple)
producer = &testProducer{
publish: func(try bool, event interface{}) (queue.EntryID, bool) {
count++
return queue.EntryID(count), true
},
cancel: func() int {

wg.Done()
return id
},
}

wg.Add(1)
return producer
},
}
}

type testQueue struct {
close func() error
bufferConfig func() queue.BufferConfig
Expand Down
2 changes: 1 addition & 1 deletion x-pack/filebeat/input/awscloudwatch/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func (in *cloudwatchInput) Run(inputContext v2.Context, pipeline beat.Pipeline)

// Create client for publishing events and receive notification of their ACKs.
client, err := pipeline.ConnectWith(beat.ClientConfig{
CloseRef: inputContext.Cancelation,
// CloseRef: inputContext.Cancelation,
EventListener: awscommon.NewEventACKHandler(),
})
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion x-pack/filebeat/input/awss3/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func (in *s3Input) Run(inputContext v2.Context, pipeline beat.Pipeline) error {
if in.config.BucketARN != "" || in.config.NonAWSBucketName != "" {
// Create client for publishing events and receive notification of their ACKs.
client, err := pipeline.ConnectWith(beat.ClientConfig{
CloseRef: inputContext.Cancelation,
// CloseRef: inputContext.Cancelation,
EventListener: awscommon.NewEventACKHandler(),
Processing: beat.ProcessingConfig{
// This input only produces events with basic types so normalization
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,13 @@ func (n *input) Run(runCtx v2.Context, connector beat.PipelineConnector) (err er
}()

client, err := connector.ConnectWith(beat.ClientConfig{
CloseRef: runCtx.Cancelation,
// CloseRef: runCtx.Cancelation,
EventListener: NewTxACKHandler(),
})
if err != nil {
return fmt.Errorf("could not connect to publishing pipeline: %s", err)

Check failure on line 72 in x-pack/filebeat/input/entityanalytics/internal/kvstore/input.go

View workflow job for this annotation

GitHub Actions / lint (windows)

non-wrapping format verb for fmt.Errorf. Use `%w` to format errors (errorlint)
}
defer client.Close()

dataDir := paths.Resolve(paths.Data, "kvstore")
if err = os.MkdirAll(dataDir, 0700); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion x-pack/filebeat/input/lumberjack/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (i *lumberjackInput) Run(inputCtx inputv2.Context, pipeline beat.Pipeline)

// Create client for publishing events and receive notification of their ACKs.
client, err := pipeline.ConnectWith(beat.ClientConfig{
CloseRef: inputCtx.Cancelation,
// CloseRef: inputCtx.Cancelation,
EventListener: newEventACKHandler(),
})
if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion x-pack/filebeat/input/netflow/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,14 +120,15 @@ func (n *netflowInput) Run(ctx v2.Context, connector beat.PipelineConnector) err
// is not required.
EventNormalization: boolPtr(false),
},
CloseRef: ctx.Cancelation,
// CloseRef: ctx.Cancelation,
EventListener: nil,
})
if err != nil {
n.logger.Errorw("Failed connecting to beat event publishing", "error", err)
n.stop()
return err
}
defer client.Close()

const pollInterval = time.Minute
udpMetrics := netmetrics.NewUDP("netflow", ctx.ID, n.cfg.Host, uint64(n.cfg.ReadBuffer), pollInterval, n.logger)
Expand Down
3 changes: 2 additions & 1 deletion x-pack/filebeat/input/shipper/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,11 +174,12 @@ func (in *shipperInput) Run(inputContext v2.Context, pipeline beat.Pipeline) err
DisableType: true,
},

CloseRef: inputContext.Cancelation,
// CloseRef: inputContext.Cancelation,
})
if err != nil {
return fmt.Errorf("error creating client for stream %s: %w", streamID, err)
}
defer client.Close()
in.log.Infof("Creating beat client for stream %s", streamID)

newStreamData := streamData{client: client, index: in.streams[streamID].index, processors: in.streams[streamID].processors}
Expand Down

0 comments on commit 4210d81

Please sign in to comment.