Skip to content

Commit

Permalink
logs: proper stop sequence for the processor to delete the SDS instan…
Browse files Browse the repository at this point in the history
…ce. (#24615)

* logs: proper stop sequence for the processor to delete the SDS instance.

* Add an unit test reproducing the race which has been fixed.
  • Loading branch information
remeh authored Apr 12, 2024
1 parent 9d5da02 commit 51ce821
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 4 deletions.
6 changes: 4 additions & 2 deletions pkg/logs/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,14 @@ func (p *Processor) Start() {
// Stop stops the Processor,
// this call blocks until inputChan is flushed
func (p *Processor) Stop() {
close(p.inputChan)
<-p.done
// once the processor mainloop is not running, it's safe
// to delete the sds scanner instance.
if p.sds != nil {
p.sds.Delete()
p.sds = nil
}
close(p.inputChan)
<-p.done
}

// Flush processes synchronously the messages that this processor has to process.
Expand Down
10 changes: 8 additions & 2 deletions pkg/logs/sds/scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,11 @@ const SDSEnabled = true
// has to ensure of the thread safety.
type Scanner struct {
*sds.Scanner
// lock used to separate between the lifecycle of the scanner (Reconfigure, Delete)
// and the use of the scanner (Scan).
sync.Mutex

// standard rules as received through the remote configuration, indexed
// by the standard rule ID for O(1) access when receiving user configurations.
standardRules map[string]StandardRuleConfig
// rawConfig is the raw config previously received through RC.
rawConfig []byte
Expand Down Expand Up @@ -287,8 +290,11 @@ func (s *Scanner) GetRuleByIdx(idx uint32) (RuleConfig, error) {
}

// Delete deallocates the internal SDS scanner.
// This method is NOT thread safe, caller has to ensure the thread safety.
// This method is thread safe, a reconfiguration or a scan can't happen at the same time.
func (s *Scanner) Delete() {
s.Lock()
defer s.Unlock()

if s.Scanner != nil {
s.Scanner.Delete()
s.rawConfig = nil
Expand Down
94 changes: 94 additions & 0 deletions pkg/logs/sds/scanner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ package sds
import (
"bytes"
"testing"
"time"

"github.com/DataDog/datadog-agent/pkg/logs/message"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -359,3 +360,96 @@ func TestScan(t *testing.T) {
require.Equal(string(processed), v.event, "incorrect result")
}
}

// TestCloseCycleScan validates that the close cycle works well (not blocking, not racing).
// by trying hard to reproduce a possible race on close.
func TestCloseCycleScan(t *testing.T) {
require := require.New(t)

standardRules := []byte(`
{"priority":1,"rules":[
{
"id":"zero-0",
"description":"zero desc",
"name":"zero",
"pattern":"zero"
}
]}
`)
agentConfig := []byte(`
{"is_enabled":true,"rules":[
{
"id":"random-00000",
"definition":{"standard_rule_id":"zero-0"},
"name":"zero",
"match_action":{"type":"Redact","placeholder":"[redacted]"},
"is_enabled":true
},{
"id":"random-11111",
"definition":{"standard_rule_id":"zero-0"},
"name":"one",
"match_action":{"type":"Redact","placeholder":"[REDACTED]"},
"is_enabled":true
}
]}
`)

// scanner creation
// -----

for i := 0; i < 10; i++ {
s := CreateScanner()
require.NotNil(s, "the returned scanner should not be nil")

_ = s.Reconfigure(ReconfigureOrder{
Type: StandardRules,
Config: standardRules,
})
_ = s.Reconfigure(ReconfigureOrder{
Type: AgentConfig,
Config: agentConfig,
})

require.True(s.IsReady(), "at this stage, the scanner should be considered ready")
type result struct {
matched bool
event string
matchCount int
}

tests := map[string]result{
"one two three go!": {
matched: true,
event: "[REDACTED] two three go!",
matchCount: 1,
},
"after zero comes one, after one comes two, and the rest is history": {
matched: true,
event: "after [redacted] comes [REDACTED], after [REDACTED] comes two, and the rest is history",
matchCount: 3,
},
"and so we go": {
matched: false,
event: "",
matchCount: 0,
},
}

go func() {
for {
for k, _ := range tests {
msg := message.Message{}
if s.IsReady() {
_, _, err := s.Scan([]byte(k), &msg)
require.NoError(err)
} else {
return
}
}
}
}()

time.Sleep(100 * time.Millisecond)
s.Delete()
}
}

0 comments on commit 51ce821

Please sign in to comment.