Skip to content

Commit

Permalink
adding mtime age while fetching files using doublestar and added refr…
Browse files Browse the repository at this point in the history
…esh interval to prevent fetching files each pool
  • Loading branch information
lokesh.balla committed Apr 23, 2024
1 parent 00a15b1 commit 0b4eea2
Show file tree
Hide file tree
Showing 7 changed files with 88 additions and 29 deletions.
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ module github.com/open-telemetry/opentelemetry-collector-contrib

go 1.21

replace github.com/bmatcuk/doublestar/v4 => github.com/opsramp/doublestar/v4 v4.0.0-20240422124750-f0571f1928b4

require (
github.com/open-telemetry/opentelemetry-collector-contrib/connector/countconnector v0.98.0
github.com/open-telemetry/opentelemetry-collector-contrib/connector/datadogconnector v0.98.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 4 additions & 3 deletions pkg/stanza/fileconsumer/matcher/internal/finder/finder.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package finder // import "github.com/open-telemetry/opentelemetry-collector-cont
import (
"errors"
"fmt"
"time"

"github.com/bmatcuk/doublestar/v4"
)
Expand All @@ -21,16 +22,16 @@ func Validate(globs []string) error {
}

// FindFiles gets a list of paths given an array of glob patterns to include and exclude
func FindFiles(includes []string, excludes []string) ([]string, error) {
func FindFiles(includes []string, excludes []string, maxAge time.Duration) ([]string, error) {
var errs error
all := make([]string, 0, len(includes))
for _, include := range includes {
matches, err := doublestar.FilepathGlob(include, doublestar.WithFilesOnly(), doublestar.WithFailOnIOErrors())
matches, err := doublestar.FilepathGlob(include, doublestar.WithFilesOnly(), doublestar.WithFailOnIOErrors(), doublestar.WithMaxAge(maxAge))

Check failure on line 29 in pkg/stanza/fileconsumer/matcher/internal/finder/finder.go

View workflow job for this annotation

GitHub Actions / govulncheck (cmd-1)

undefined: doublestar.WithMaxAge

Check failure on line 29 in pkg/stanza/fileconsumer/matcher/internal/finder/finder.go

View workflow job for this annotation

GitHub Actions / govulncheck (cmd-0)

undefined: doublestar.WithMaxAge

Check failure on line 29 in pkg/stanza/fileconsumer/matcher/internal/finder/finder.go

View workflow job for this annotation

GitHub Actions / govulncheck (receiver-0)

undefined: doublestar.WithMaxAge

Check failure on line 29 in pkg/stanza/fileconsumer/matcher/internal/finder/finder.go

View workflow job for this annotation

GitHub Actions / govulncheck (receiver-1)

undefined: doublestar.WithMaxAge
if err != nil {
errs = errors.Join(errs, fmt.Errorf("find files with '%s' pattern: %w", include, err))
// the same pattern could cause an IO error due to one file or directory,
// but also could still find files without `doublestar.WithFailOnIOErrors()`.
matches, _ = doublestar.FilepathGlob(include, doublestar.WithFilesOnly())
matches, _ = doublestar.FilepathGlob(include, doublestar.WithFilesOnly(), doublestar.WithMaxAge(maxAge))

Check failure on line 34 in pkg/stanza/fileconsumer/matcher/internal/finder/finder.go

View workflow job for this annotation

GitHub Actions / govulncheck (cmd-1)

undefined: doublestar.WithMaxAge

Check failure on line 34 in pkg/stanza/fileconsumer/matcher/internal/finder/finder.go

View workflow job for this annotation

GitHub Actions / govulncheck (receiver-0)

undefined: doublestar.WithMaxAge

Check failure on line 34 in pkg/stanza/fileconsumer/matcher/internal/finder/finder.go

View workflow job for this annotation

GitHub Actions / govulncheck (receiver-1)

undefined: doublestar.WithMaxAge
}
INCLUDE:
for _, match := range matches {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ func TestFindFiles(t *testing.T) {
require.NoError(t, err)
require.NoError(t, file.Close())
}
files, err := FindFiles(tc.include, tc.exclude)
files, err := FindFiles(tc.include, tc.exclude, 0)
assert.NoError(t, err)
assert.Equal(t, tc.expected, files)
})
Expand Down Expand Up @@ -249,7 +249,7 @@ func TestFindFilesWithIOErrors(t *testing.T) {

for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
files, err := FindFiles(tc.include, []string{})
files, err := FindFiles(tc.include, []string{}, 0)
assert.ErrorContains(t, err, tc.failedMsg)
assert.Equal(t, tc.expected, files)
})
Expand Down
94 changes: 74 additions & 20 deletions pkg/stanza/fileconsumer/matcher/matcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ type OrderingCriteria struct {
Regex string `mapstructure:"regex,omitempty"`
TopN int `mapstructure:"top_n,omitempty"`
SortBy []Sort `mapstructure:"sort_by,omitempty"`

RefreshInterval time.Duration `mapstructure:"refresh_interval,omitempty"`
}

type Sort struct {
Expand Down Expand Up @@ -70,13 +72,6 @@ func New(c Criteria) (*Matcher, error) {
return nil, fmt.Errorf("exclude: %w", err)
}

if len(c.OrderingCriteria.SortBy) == 0 {
return &Matcher{
include: c.Include,
exclude: c.Exclude,
}, nil
}

if c.OrderingCriteria.TopN < 0 {
return nil, fmt.Errorf("'top_n' must be a positive integer")
}
Expand All @@ -85,6 +80,20 @@ func New(c Criteria) (*Matcher, error) {
c.OrderingCriteria.TopN = defaultOrderingCriteriaTopN
}

if c.OrderingCriteria.RefreshInterval.Seconds() == 0 {
c.OrderingCriteria.RefreshInterval = time.Minute
}

if len(c.OrderingCriteria.SortBy) == 0 {
return &Matcher{
include: c.Include,
exclude: c.Exclude,
refreshInterval: c.OrderingCriteria.RefreshInterval,
topN: c.OrderingCriteria.TopN,
cache: newCache(),
}, nil
}

var regex *regexp.Regexp
if orderingCriteriaNeedsRegex(c.OrderingCriteria.SortBy) {
if c.OrderingCriteria.Regex == "" {
Expand All @@ -98,6 +107,7 @@ func New(c Criteria) (*Matcher, error) {
}
}

var maxAge time.Duration
var filterOpts []filter.Option
for _, sc := range c.OrderingCriteria.SortBy {
switch sc.SortType {
Expand All @@ -123,18 +133,22 @@ func New(c Criteria) (*Matcher, error) {
if !mtimeSortTypeFeatureGate.IsEnabled() {
return nil, fmt.Errorf("the %q feature gate must be enabled to use %q sort type", mtimeSortTypeFeatureGate.ID(), sortTypeMtime)
}
maxAge = sc.MaxTime
filterOpts = append(filterOpts, filter.SortMtime(sc.MaxTime))
default:
return nil, fmt.Errorf("'sort_type' must be specified")
}
}

return &Matcher{
include: c.Include,
exclude: c.Exclude,
regex: regex,
topN: c.OrderingCriteria.TopN,
filterOpts: filterOpts,
include: c.Include,
exclude: c.Exclude,
regex: regex,
refreshInterval: c.OrderingCriteria.RefreshInterval,
maxAge: maxAge,
topN: c.OrderingCriteria.TopN,
filterOpts: filterOpts,
cache: newCache(),
}, nil
}

Expand All @@ -149,36 +163,76 @@ func orderingCriteriaNeedsRegex(sorts []Sort) bool {
return false
}

// cache stores the matched files and last updated time. No mutex is used since all calls are sequential
type cache struct {
lastUpdatedTime time.Time

files []string
}

func newCache() *cache {
return &cache{}
}

func (c *cache) getFiles() []string {
return c.files
}

func (c *cache) update(files []string) {
c.files = files
c.lastUpdatedTime = time.Now()
}

func (c *cache) getLastUpdatedTime() time.Time {
return c.lastUpdatedTime
}

type Matcher struct {
include []string
exclude []string
regex *regexp.Regexp
topN int
filterOpts []filter.Option

refreshInterval time.Duration
maxAge time.Duration
cache *cache
}

// MatchFiles gets a list of paths given an array of glob patterns to include and exclude
func (m Matcher) MatchFiles() ([]string, error) {
var errs error
files, err := finder.FindFiles(m.include, m.exclude)
var err, errs error

files := m.cache.getFiles()
if time.Since(m.cache.getLastUpdatedTime()) < m.refreshInterval {
return files, nil
}

files, err = finder.FindFiles(m.include, m.exclude, m.maxAge)
if err != nil {
errs = errors.Join(errs, err)
}

if len(files) == 0 {
return files, errors.Join(fmt.Errorf("no files match the configured criteria"), errs)
}
if len(m.filterOpts) == 0 {
return files, errs
}

result, err := filter.Filter(files, m.regex, m.filterOpts...)
if len(result) == 0 {
return result, errors.Join(err, errs)
files, err = filter.Filter(files, m.regex, m.filterOpts...)
if len(files) == 0 {
return files, errors.Join(err, errs)
}

if len(result) <= m.topN {
return result, errors.Join(err, errs)
if len(files) <= m.topN {
m.cache.update(files)
return files, errors.Join(err, errs)
}

return result[:m.topN], errors.Join(err, errs)
files = files[:m.topN]

m.cache.update(files)

return files, errors.Join(err, errs)
}
2 changes: 2 additions & 0 deletions pkg/stanza/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza

go 1.21

replace github.com/bmatcuk/doublestar/v4 => github.com/opsramp/doublestar/v4 v4.0.0-20240422124750-f0571f1928b4

require (
github.com/bmatcuk/doublestar/v4 v4.6.1
github.com/cespare/xxhash/v2 v2.3.0
Expand Down
4 changes: 2 additions & 2 deletions pkg/stanza/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 0b4eea2

Please sign in to comment.