Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

scan older files to be harvested based on mod time #4374

Merged
merged 11 commits into from
Jun 27, 2017
2 changes: 2 additions & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,8 @@ https://github.com/elastic/beats/compare/v6.0.0-alpha1...v6.0.0-alpha2[View comm
- Add `logging.files` `permissions` option. {pull}4295[4295]

*Filebeat*
- Added ability to sort harvested files. {pull}4374[4374]
- Add experimental Redis slow log prospector type. {pull}4180[4180]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line probably sneaked in during rebasing?


- Add experimental Redis slow log prospector type. {pull}4180[4180]

Expand Down
19 changes: 19 additions & 0 deletions filebeat/docs/reference/configuration/filebeat-options.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,25 @@ If you require log lines to be sent in near real time do not use a very low `sca

The default setting is 10s.


[[scan-sort]]
===== scan.sort

Specifies if files should be harvested in order and how to determine the order. Possible values are modtime, filename and none. To sort by file modification time use modtime otherwise use filename.

If you specify a value other than none for this setting you can determine whether to use ascending or descending order using `scan.order`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add . dots at the end of the sentences? Also check below.


The default setting is none


[[scan-order]]
===== scan.order

Specifies ascending or descending order if `scan.sort` is set to a value other than none. Possible values are asc or desc

The default setting is asc


===== harvester_buffer_size

The size in bytes of the buffer that each harvester uses when fetching a file. The default is 16384.
Expand Down
4 changes: 4 additions & 0 deletions filebeat/prospector/log/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ var (
HarvesterLimit: 0,
Symlinks: false,
TailFiles: false,
ScanSort: "none",
ScanOrder: "asc",

// Harvester
BufferSize: 16 * humanize.KiByte,
Expand Down Expand Up @@ -69,6 +71,8 @@ type config struct {
// Harvester
BufferSize int `config:"harvester_buffer_size"`
Encoding string `config:"encoding"`
ScanOrder string `config:"scan.order"`
ScanSort string `config:"scan.sort"`

ExcludeLines []match.Matcher `config:"exclude_lines"`
IncludeLines []match.Matcher `config:"include_lines"`
Expand Down
111 changes: 101 additions & 10 deletions filebeat/prospector/log/prospector.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"fmt"
"os"
"path/filepath"
"sort"
"strings"
"time"

"github.com/elastic/beats/filebeat/channel"
Expand Down Expand Up @@ -264,10 +266,107 @@ func (p *Prospector) matchesFile(filePath string) bool {
return false
}

type FileSortInfo struct {
info os.FileInfo
path string
}

func getSortInfos(paths map[string]os.FileInfo) []FileSortInfo {

sortInfos := make([]FileSortInfo, 0, len(paths))
for path, info := range paths {
sortInfo := FileSortInfo{info: info, path: path}
sortInfos = append(sortInfos, sortInfo)
}

return sortInfos
}

func getSortedFiles(scanOrder string, scanSort string, sortInfos []FileSortInfo) []FileSortInfo {
var sortFunc func(i, j int) bool = nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need to set it to nil here

switch scanSort {
case "modtime":
switch scanOrder {
case "asc":
sortFunc = func(i, j int) bool {
return sortInfos[i].info.ModTime().Before(sortInfos[j].info.ModTime())
}
case "desc":
sortFunc = func(i, j int) bool {
return sortInfos[i].info.ModTime().After(sortInfos[j].info.ModTime())
}
default:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be possible to return an explicit error on these default branches?

Copy link
Contributor Author

@codeperfector codeperfector Jun 14, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tsg what do you want to happen if the default case is reached? we can abort with an error message (panic), or log an error and use the default unsorted behavior.

}
case "filename":
switch scanOrder {
case "asc":
sortFunc = func(i, j int) bool {
return strings.Compare(sortInfos[i].info.Name(), sortInfos[j].info.Name()) < 0
}
case "desc":
sortFunc = func(i, j int) bool {
return strings.Compare(sortInfos[i].info.Name(), sortInfos[j].info.Name()) > 0
}
default:
}
default:
}

if sortFunc != nil {
sort.Slice(sortInfos, sortFunc)
}

return sortInfos
}

func getFileState(path string, info os.FileInfo, p *Prospector) file.State {
var err error
var absolutePath string
absolutePath, err = filepath.Abs(path)
if err != nil {
logp.Err("could not fetch abs path for file %s: %s", absolutePath, err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we return here on error? Means this also can return an error?

}
logp.Debug("prospector", "Check file for harvesting: %s", absolutePath)
// Create new state for comparison
newState := file.NewState(info, absolutePath, p.config.Type)
return newState
}

func getKeys(paths map[string]os.FileInfo) []string {
files := make([]string, 0)
for file := range paths {
files = append(files, file)
}
return files
}

// Scan starts a scanGlob for each provided path/glob
func (p *Prospector) scan() {

for path, info := range p.getFiles() {
var sortInfos []FileSortInfo
var files []string

paths := p.getFiles()
if strings.ToLower(p.config.ScanSort) != "none" {
sortInfos = getSortedFiles(strings.ToLower(p.config.ScanOrder),
strings.ToLower(p.config.ScanSort),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not a fan of calling ToLower here, as this creates some leniency in the config parsing which we don't have in other places.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@waynemz Did you see this comment?

getSortInfos(paths))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should handle the error directly inside the if clause. In case of error should we return? what are we doing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case we are going to print an error saying why sorting failed and continue the regular unsorted route.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM

} else {
files = getKeys(paths)
}

for i := 0; i < len(paths); i++ {

var path string
var info os.FileInfo

if strings.ToLower(p.config.ScanSort) != "none" {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removing ToLower here should also mean less work on the default case.

path = sortInfos[i].path
info = sortInfos[i].info
} else {
path = files[i]
info = paths[path]
}

select {
case <-p.done:
Expand All @@ -276,15 +375,7 @@ func (p *Prospector) scan() {
default:
}

var err error
path, err = filepath.Abs(path)
if err != nil {
logp.Err("could not fetch abs path for file %s: %s", path, err)
}
logp.Debug("prospector", "Check file for harvesting: %s", path)

// Create new state for comparison
newState := file.NewState(info, path, p.config.Type)
newState := getFileState(path, info, p)

// Load last state
lastState := p.states.FindPrevious(newState)
Expand Down