diff --git a/operator/builtin/transformer/filter/filter.go b/operator/builtin/transformer/filter/filter.go index 20e804609..e0bb2cf92 100644 --- a/operator/builtin/transformer/filter/filter.go +++ b/operator/builtin/transformer/filter/filter.go @@ -2,8 +2,9 @@ package filter import ( "context" + "crypto/rand" "fmt" - "math/rand" + "math/big" "github.com/antonmedv/expr" "github.com/antonmedv/expr/vm" @@ -17,6 +18,10 @@ func init() { operator.Register("filter", func() operator.Builder { return NewFilterOperatorConfig("") }) } +var upperBound = big.NewInt(1000) + +var randInt = rand.Int // allow override for testing + // NewFilterOperatorConfig creates a filter operator config with default values func NewFilterOperatorConfig(operatorID string) *FilterOperatorConfig { return &FilterOperatorConfig{ @@ -51,7 +56,7 @@ func (c FilterOperatorConfig) Build(context operator.BuildContext) ([]operator.O filterOperator := &FilterOperator{ TransformerOperator: transformer, expression: compiledExpression, - dropRatio: c.DropRatio, + dropCutoff: big.NewInt(int64(c.DropRatio * 1000)), } return []operator.Operator{filterOperator}, nil @@ -61,7 +66,7 @@ func (c FilterOperatorConfig) Build(context operator.BuildContext) ([]operator.O type FilterOperator struct { helper.TransformerOperator expression *vm.Program - dropRatio float64 + dropCutoff *big.Int // [0..1000) } // Process will drop incoming entries that match the filter expression @@ -81,7 +86,17 @@ func (f *FilterOperator) Process(ctx context.Context, entry *entry.Entry) error return nil } - if !filtered || rand.Float64() > f.dropRatio { + if !filtered { + f.Write(ctx, entry) + return nil + } + + i, err := randInt(rand.Reader, upperBound) + if err != nil { + return err + } + + if i.Cmp(f.dropCutoff) >= 0 { f.Write(ctx, entry) } diff --git a/operator/builtin/transformer/filter/filter_test.go b/operator/builtin/transformer/filter/filter_test.go index 07b91abf0..f3d409cb0 100644 --- a/operator/builtin/transformer/filter/filter_test.go +++ b/operator/builtin/transformer/filter/filter_test.go @@ -2,7 +2,8 @@ package filter import ( "context" - "math/rand" + "io" + "math/big" "os" "testing" @@ -141,14 +142,21 @@ func TestFilterDropRatio(t *testing.T) { }, } + nextIndex := 0 + randos := []int64{250, 750} + randInt = func(io.Reader, *big.Int) (*big.Int, error) { + defer func() { + nextIndex = (nextIndex + 1) % len(randos) + }() + return big.NewInt(randos[nextIndex]), nil + } + for i := 1; i < 11; i++ { - rand.Seed(1) err = filterOperator.Process(context.Background(), testEntry) require.NoError(t, err) } for i := 1; i < 11; i++ { - rand.Seed(2) err = filterOperator.Process(context.Background(), testEntry) require.NoError(t, err) }