Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

draft: Reverify chunks next #2346

Closed
wants to merge 24 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ require (
github.com/AzureAD/microsoft-authentication-library-for-go v1.2.1
github.com/BobuSumisu/aho-corasick v1.0.3
github.com/TheZeroSlave/zapsentry v1.19.0
github.com/adrg/strutil v0.3.1
github.com/alecthomas/kingpin/v2 v2.4.0
github.com/aws/aws-sdk-go v1.50.0
github.com/aymanbagabas/go-osc52 v1.2.2
Expand Down Expand Up @@ -72,6 +73,7 @@ require (
github.com/stretchr/testify v1.8.4
github.com/tailscale/depaware v0.0.0-20210622194025-720c4b409502
github.com/trufflesecurity/disk-buffer-reader v0.2.1
github.com/wasilibs/go-re2 v1.4.1
github.com/xanzy/go-gitlab v0.94.0
go.mongodb.org/mongo-driver v1.12.1
go.uber.org/mock v0.3.0
Expand Down Expand Up @@ -239,7 +241,6 @@ require (
github.com/therootcompany/xz v1.0.1 // indirect
github.com/ulikunitz/xz v0.5.11 // indirect
github.com/vbatts/tar-split v0.11.3 // indirect
github.com/wasilibs/go-re2 v1.4.1 // indirect
github.com/xanzy/ssh-agent v0.3.3 // indirect
github.com/xdg-go/pbkdf2 v1.0.0 // indirect
github.com/xdg-go/scram v1.1.2 // indirect
Expand Down
50 changes: 6 additions & 44 deletions go.sum

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ var (
concurrency = cli.Flag("concurrency", "Number of concurrent workers.").Default(strconv.Itoa(runtime.NumCPU())).Int()
noVerification = cli.Flag("no-verification", "Don't verify the results.").Bool()
onlyVerified = cli.Flag("only-verified", "Only output verified results.").Bool()
forceReverification = cli.Flag("force-reverification", "Verify credentials when multiple similar credentials are found across detectors.").Bool()
filterUnverified = cli.Flag("filter-unverified", "Only output first unverified result per chunk per detector if there are more than one results.").Bool()
filterEntropy = cli.Flag("filter-entropy", "Filter unverified results with Shannon entropy. Start with 3.0.").Float64()
configFilename = cli.Flag("config", "Path to configuration file.").ExistingFile()
Expand Down Expand Up @@ -409,6 +410,7 @@ func run(state overseer.State) {
engine.WithPrintAvgDetectorTime(*printAvgDetectorTime),
engine.WithPrinter(printer),
engine.WithFilterEntropy(*filterEntropy),
engine.WithForceReverification(*forceReverification),
)
if err != nil {
logFatal(err, "error initializing engine")
Expand Down
10 changes: 8 additions & 2 deletions pkg/engine/ahocorasickcore.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"strings"

ahocorasick "github.com/BobuSumisu/aho-corasick"

"github.com/trufflesecurity/trufflehog/v3/pkg/custom_detectors"
"github.com/trufflesecurity/trufflehog/v3/pkg/detectors"
"github.com/trufflesecurity/trufflehog/v3/pkg/pb/detectorspb"
Expand Down Expand Up @@ -65,12 +66,17 @@ func NewAhoCorasickCore(allDetectors []detectors.Detector) *AhoCorasickCore {
// PopulateMatchingDetectors populates the given detector slice with all the detectors matching the
// provided input. This method populates an existing map rather than allocating a new one because
// it will be called once per chunk and that many allocations has a noticeable performance cost.
func (ac *AhoCorasickCore) PopulateMatchingDetectors(chunkData string, detectors map[DetectorKey]detectors.Detector) {
func (ac *AhoCorasickCore) PopulateMatchingDetectors(chunkData string, dts map[DetectorKey]detectors.Detector) []detectors.Detector {
matches := ac.prefilter.MatchString(strings.ToLower(chunkData))
d := make([]detectors.Detector, 0, len(matches))
for _, m := range ac.prefilter.MatchString(strings.ToLower(chunkData)) {
for _, k := range ac.keywordsToDetectors[m.MatchString()] {
detectors[k] = ac.detectorsByKey[k]
dts[k] = ac.detectorsByKey[k]
d = append(d, ac.detectorsByKey[k])
}
}

return d
}

// createDetectorKey creates a unique key for each detector from its type, version, and, for
Expand Down
224 changes: 217 additions & 7 deletions pkg/engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"sync/atomic"
"time"

"github.com/adrg/strutil"
"github.com/adrg/strutil/metrics"
lru "github.com/hashicorp/golang-lru"
"google.golang.org/protobuf/proto"

Expand Down Expand Up @@ -62,18 +64,21 @@ type Engine struct {
// entropyFilter is used to filter out unverified results using Shannon entropy.
filterEntropy *float64
onlyVerified bool
forceReverification bool
printAvgDetectorTime bool

// ahoCorasickHandler manages the Aho-Corasick trie and related keyword lookups.
ahoCorasickCore *AhoCorasickCore

// Engine synchronization primitives.
sourceManager *sources.SourceManager
results chan detectors.ResultWithMetadata
detectableChunksChan chan detectableChunk
workersWg sync.WaitGroup
wgDetectorWorkers sync.WaitGroup
WgNotifier sync.WaitGroup
sourceManager *sources.SourceManager
results chan detectors.ResultWithMetadata
detectableChunksChan chan detectableChunk
reverifiableChunksChan chan reVerifiableChunk
workersWg sync.WaitGroup
reverifiersWg sync.WaitGroup
wgDetectorWorkers sync.WaitGroup
WgNotifier sync.WaitGroup

// Runtime information.
metrics runtimeMetrics
Expand All @@ -91,6 +96,20 @@ type Engine struct {

// verify determines whether the scanner will attempt to verify candidate secrets
verify bool

// Note: bad hack only used for testing
reverificationTracking *reverificationTracking
}

type reverificationTracking struct {
reverificationDuplicateCount int
mu sync.Mutex
}

func (r *reverificationTracking) increment() {
r.mu.Lock()
r.reverificationDuplicateCount++
r.mu.Unlock()
}

// Option is used to configure the engine during initialization using functional options.
Expand Down Expand Up @@ -181,6 +200,21 @@ func WithVerify(verify bool) Option {
}
}

func withReverificationTracking() Option {
return func(e *Engine) {
e.reverificationTracking = &reverificationTracking{
reverificationDuplicateCount: 0,
}
}
}

// WithForceReverification TODO comment
func WithForceReverification(forceReverification bool) Option {
return func(e *Engine) {
e.forceReverification = forceReverification
}
}

func filterDetectors(filterFunc func(detectors.Detector) bool, input []detectors.Detector) []detectors.Detector {
var out []detectors.Detector
for _, detector := range input {
Expand Down Expand Up @@ -303,6 +337,7 @@ func (e *Engine) initialize(ctx context.Context, options ...Option) error {
// Channels are used for communication between different parts of the engine,
// ensuring that data flows smoothly without race conditions.
e.detectableChunksChan = make(chan detectableChunk, defaultChannelBuffer)
e.reverifiableChunksChan = make(chan reVerifiableChunk, defaultChannelBuffer)
e.results = make(chan detectors.ResultWithMetadata, defaultChannelBuffer)
e.dedupeCache = cache
e.printer = new(output.PlainPrinter)
Expand Down Expand Up @@ -392,6 +427,18 @@ func (e *Engine) startWorkers(ctx context.Context) {
}()
}

// reverifiers...
ctx.Logger().V(2).Info("starting reverifier workers", "count", e.concurrency)
for worker := uint64(0); worker < uint64(e.concurrency); worker++ {
e.reverifiersWg.Add(1)
go func() {
ctx := context.WithValue(ctx, "secret_worker_id", common.RandomID(5))
defer common.Recover(ctx)
defer e.reverifiersWg.Done()
e.reverifierWorker(ctx)
}()
}

// Notifier workers communicate detected issues to the user or any downstream systems.
// We want 1/4th of the notifier workers as the number of scanner workers.
const notifierWorkerRatio = 4
Expand Down Expand Up @@ -420,6 +467,10 @@ func (e *Engine) Finish(ctx context.Context) error {
err := e.sourceManager.Wait()

e.workersWg.Wait() // Wait for the workers to finish scanning chunks.

close(e.reverifiableChunksChan)
e.reverifiersWg.Wait()

close(e.detectableChunksChan)
e.wgDetectorWorkers.Wait() // Wait for the detector workers to finish detecting chunks.

Expand Down Expand Up @@ -458,8 +509,16 @@ type detectableChunk struct {
wgDoneFn func()
}

type reVerifiableChunk struct {
chunk sources.Chunk
decoder detectorspb.DecoderType
detectors []detectors.Detector
reverifyWgDoneFn func()
}

func (e *Engine) detectorWorker(ctx context.Context) {
var wgDetect sync.WaitGroup
var wgReverify sync.WaitGroup

// Reuse the same map to avoid allocations.
const avgDetectorsPerChunk = 2
Expand All @@ -474,7 +533,21 @@ func (e *Engine) detectorWorker(ctx context.Context) {
continue
}

e.ahoCorasickCore.PopulateMatchingDetectors(string(decoded.Chunk.Data), chunkSpecificDetectors)
matchingDetectors := e.ahoCorasickCore.PopulateMatchingDetectors(string(decoded.Chunk.Data), chunkSpecificDetectors)
if len(chunkSpecificDetectors) > 1 && !e.forceReverification {
wgReverify.Add(1)
e.reverifiableChunksChan <- reVerifiableChunk{
chunk: *decoded.Chunk,
detectors: matchingDetectors,
decoder: decoded.DecoderType,
reverifyWgDoneFn: wgReverify.Done,
}
// Empty the map.
for k := range chunkSpecificDetectors {
delete(chunkSpecificDetectors, k)
}
continue
}

for k, detector := range chunkSpecificDetectors {
decoded.Chunk.Verify = e.verify
Expand All @@ -491,10 +564,147 @@ func (e *Engine) detectorWorker(ctx context.Context) {
}
atomic.AddUint64(&e.metrics.ChunksScanned, 1)
}

wgReverify.Wait()
wgDetect.Wait()
ctx.Logger().V(4).Info("finished scanning chunks")
}

func likelyDuplicate(val string, dupesSlice []string) bool {
for _, v := range dupesSlice {
if v == val {
fmt.Println("found exact duplicate", val, v)
return true
}
similarity := strutil.Similarity(val, v, metrics.NewLevenshtein())

// close enough
if similarity > 0.9 {
fmt.Println("found similar duplicate", val, v, similarity)
return true
}
}
return false
}

func (e *Engine) reverifierWorker(ctx context.Context) {
var wgDetect sync.WaitGroup

// Reuse the same map and slice to avoid allocations.
const avgSecretsPerDetector = 8

detectorsToVerify := make(map[string]detectors.Detector, avgSecretsPerDetector)
chunkSecrets := make([]string, 0, avgSecretsPerDetector)

var wgReverify sync.WaitGroup
dMu := sync.Mutex{}
detectorResults := make(map[string][]detectors.Result, avgSecretsPerDetector)

for chunk := range e.reverifiableChunksChan {
for _, detector := range chunk.detectors {
detectorsToVerify[detector.Type().String()] = detector
detector := detector
wgReverify.Add(1)
go func() {
defer wgReverify.Done()
// DO NOT VERIFY at this stage of the pipeline.
results, err := detector.FromData(ctx, false, chunk.chunk.Data)
if err != nil {
ctx.Logger().Error(err, "error verifying chunk")
}
if len(results) == 0 {
return
}
dMu.Lock()
detectorResults[detector.Type().String()] = results
dMu.Unlock()
// detectorsWithResult = append(detectorsWithResult, detector)
}()
}
wgReverify.Wait()

for _, detector := range chunk.detectors {
results := detectorResults[detector.Type().String()]

// get all the results that are NOT part of this detector
for k, v := range detectorResults {
if k == detector.Type().String() {
continue
}
for _, res := range v {
var val []byte
if res.RawV2 != nil {
val = res.RawV2
} else {
val = res.Raw
}
chunkSecrets = append(chunkSecrets, string(val))
}
}

likelyDup := false

for _, res := range results {
var val []byte
if res.RawV2 != nil {
val = res.RawV2
} else {
val = res.Raw
}

// Use levenstein distance to determine if the secret is likely the same.
// Ex:
// - postman api key: PMAK-qnwfsLyRSyfCwfpHaQP1UzDhrgpWvHjbYzjpRCMshjt417zWcrzyHUArs7r
// - malicious detector "api key": qnwfsLyRSyfCwfpHaQP1UzDhrgpWvHjbYzjpRCMshjt417zWcrzyHUArs7r
if likelyDuplicate(string(val), chunkSecrets) {
// This indicates that the same secret was found by multiple detectors.
// We should NOT VERIFY this chunk's data.
if e.reverificationTracking != nil {
e.reverificationTracking.increment()
}
likelyDup = true
}
}

if likelyDup {
wgDetect.Add(1)
chunk.chunk.Verify = false // DO NOT VERIFY
e.detectableChunksChan <- detectableChunk{
chunk: chunk.chunk,
detector: detector,
decoder: chunk.decoder,
wgDoneFn: wgDetect.Done,
}

delete(detectorsToVerify, detector.Type().String())

// Empty the dupes and detectors slice
chunkSecrets = chunkSecrets[:0]
}
}

for k, detector := range detectorsToVerify {
wgDetect.Add(1)
chunk.chunk.Verify = e.verify
e.detectableChunksChan <- detectableChunk{
chunk: chunk.chunk,
detector: detector,
decoder: chunk.decoder,
wgDoneFn: wgDetect.Done,
}
delete(detectorsToVerify, k)
}

for k := range detectorResults {
delete(detectorResults, k)
}
chunk.reverifyWgDoneFn()
}

wgDetect.Wait()
ctx.Logger().V(4).Info("finished reverifying chunks")
}

func (e *Engine) detectChunks(ctx context.Context) {
for data := range e.detectableChunksChan {
e.detectChunk(ctx, data)
Expand Down
Loading
Loading