Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

scan older files to be harvested based on mod time #4374

Merged
merged 11 commits into from
Jun 27, 2017
2 changes: 2 additions & 0 deletions filebeat/prospector/log/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ type config struct {
// Harvester
BufferSize int `config:"harvester_buffer_size"`
Encoding string `config:"encoding"`
ScanOrder string `config:"scan.order"`
ScanSort string `config:"scan.sort"`

ExcludeLines []match.Matcher `config:"exclude_lines"`
IncludeLines []match.Matcher `config:"include_lines"`
Expand Down
58 changes: 53 additions & 5 deletions filebeat/prospector/log/prospector.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ import (
"path/filepath"
"time"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you remove newline?

"sort"
"strings"

"github.com/elastic/beats/filebeat/channel"
"github.com/elastic/beats/filebeat/harvester"
"github.com/elastic/beats/filebeat/input/file"
Expand Down Expand Up @@ -264,10 +267,55 @@ func (p *Prospector) matchesFile(filePath string) bool {
return false
}

type FileSortInfo struct {
info os.FileInfo
path string
}

func getSortInfos(paths map[string]os.FileInfo) []FileSortInfo {

sortInfos := make([]FileSortInfo, 0, len(paths))
for path, info := range paths {
sortInfo := FileSortInfo{info: info, path: path}
sortInfos = append(sortInfos, sortInfo)
}

return sortInfos
}

func getSortedFiles(scanOrder string, scanSort string, sortInfos []FileSortInfo) []FileSortInfo {
switch scanOrder + "_" + scanSort {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you perhaps to that in two switch / case? So first check the scanSort and then the order inside each? It feels a bit strange to create a string for comparison. Perhaps there is an even "shorter" option.

You could move the sort.Slice part to after switch and only define the function which will be used inside the switch cases. This will make the cod cleaner I think.

case "asc_modtime":
sort.Slice(sortInfos, func(i, j int) bool {
return sortInfos[i].info.ModTime().Before(sortInfos[j].info.ModTime())
})
case "desc_modtime":
sort.Slice(sortInfos, func(i, j int) bool {
return sortInfos[i].info.ModTime().After(sortInfos[j].info.ModTime())
})
case "asc_filename":
sort.Slice(sortInfos, func(i, j int) bool {
return strings.Compare(sortInfos[i].info.Name(), sortInfos[j].info.Name()) < 0
})
case "desc_filename":
sort.Slice(sortInfos, func(i, j int) bool {
return strings.Compare(sortInfos[i].info.Name(), sortInfos[j].info.Name()) > 0
})
default:
}

return sortInfos
}

// Scan starts a scanGlob for each provided path/glob
func (p *Prospector) scan() {

for path, info := range p.getFiles() {
paths := p.getFiles()
files := getSortInfos(paths)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should not happen in every call, only if sorting is enabled as this could have a performance impact. I would expect something like:

if config.sort.Enabled() { paths := paths.Sort()

This is heavily simplified but you get what I'm trying to explain? Do we even need to expose the fileinfo outside of the sort?

files = getSortedFiles(strings.ToLower(p.config.ScanOrder),
strings.ToLower(p.config.ScanSort),
files)
for _, fileInfo := range files {

select {
case <-p.done:
Expand All @@ -277,14 +325,14 @@ func (p *Prospector) scan() {
}

var err error
path, err = filepath.Abs(path)
fileInfo.path, err = filepath.Abs(fileInfo.path)
if err != nil {
logp.Err("could not fetch abs path for file %s: %s", path, err)
logp.Err("could not fetch abs path for file %s: %s", fileInfo.path, err)
}
logp.Debug("prospector", "Check file for harvesting: %s", path)
logp.Debug("prospector", "Check file for harvesting: %s", fileInfo.path)

// Create new state for comparison
newState := file.NewState(info, path, p.config.Type)
newState := file.NewState(fileInfo.info, fileInfo.path, p.config.Type)

// Load last state
lastState := p.states.FindPrevious(newState)
Expand Down