Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

scan older files to be harvested based on mod time #4374

Merged
merged 11 commits into from
Jun 27, 2017
2 changes: 1 addition & 1 deletion CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ https://github.com/elastic/beats/compare/v6.0.0-alpha1...v6.0.0-alpha2[View comm
- Add `logging.files` `permissions` option. {pull}4295[4295]

*Filebeat*

- Added ability to sort harvested files. {pull}4374[4374]
- Add experimental Redis slow log prospector type. {pull}4180[4180]

*Metricbeat*
Expand Down
19 changes: 19 additions & 0 deletions filebeat/docs/reference/configuration/filebeat-options.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,25 @@ If you require log lines to be sent in near real time do not use a very low `sca

The default setting is 10s.


[[scan-sort]]
===== scan.sort

Specifies if files should be harvested in order and how to determine the order. Possible values are modtime, filename and none. To sort by file modification time use modtime otherwise use filename.

If you specify a value other than none for this setting you can determine whether to use ascending or descending order using `scan.order`.

The default setting is none.


[[scan-order]]
===== scan.order

Specifies ascending or descending order if `scan.sort` is set to a value other than none. Possible values are asc or desc.

The default setting is asc.


===== harvester_buffer_size

The size in bytes of the buffer that each harvester uses when fetching a file. The default is 16384.
Expand Down
4 changes: 4 additions & 0 deletions filebeat/prospector/log/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ var (
HarvesterLimit: 0,
Symlinks: false,
TailFiles: false,
ScanSort: "none",
ScanOrder: "asc",

// Harvester
BufferSize: 16 * humanize.KiByte,
Expand Down Expand Up @@ -69,6 +71,8 @@ type config struct {
// Harvester
BufferSize int `config:"harvester_buffer_size"`
Encoding string `config:"encoding"`
ScanOrder string `config:"scan.order"`
ScanSort string `config:"scan.sort"`

ExcludeLines []match.Matcher `config:"exclude_lines"`
IncludeLines []match.Matcher `config:"include_lines"`
Expand Down
119 changes: 111 additions & 8 deletions filebeat/prospector/log/prospector.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"fmt"
"os"
"path/filepath"
"sort"
"strings"
"time"

"github.com/elastic/beats/filebeat/channel"
Expand Down Expand Up @@ -264,10 +266,116 @@ func (p *Prospector) matchesFile(filePath string) bool {
return false
}

type FileSortInfo struct {
info os.FileInfo
path string
}

func getSortInfos(paths map[string]os.FileInfo) []FileSortInfo {

sortInfos := make([]FileSortInfo, 0, len(paths))
for path, info := range paths {
sortInfo := FileSortInfo{info: info, path: path}
sortInfos = append(sortInfos, sortInfo)
}

return sortInfos
}

func getSortedFiles(scanOrder string, scanSort string, sortInfos []FileSortInfo) ([]FileSortInfo, error) {
var sortFunc func(i, j int) bool
switch scanSort {
case "modtime":
switch scanOrder {
case "asc":
sortFunc = func(i, j int) bool {
return sortInfos[i].info.ModTime().Before(sortInfos[j].info.ModTime())
}
case "desc":
sortFunc = func(i, j int) bool {
return sortInfos[i].info.ModTime().After(sortInfos[j].info.ModTime())
}
default:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be possible to return an explicit error on these default branches?

Copy link
Contributor Author

@codeperfector codeperfector Jun 14, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tsg what do you want to happen if the default case is reached? we can abort with an error message (panic), or log an error and use the default unsorted behavior.

return nil, fmt.Errorf("Unexpected value for scan.order: %v", scanOrder)
}
case "filename":
switch scanOrder {
case "asc":
sortFunc = func(i, j int) bool {
return strings.Compare(sortInfos[i].info.Name(), sortInfos[j].info.Name()) < 0
}
case "desc":
sortFunc = func(i, j int) bool {
return strings.Compare(sortInfos[i].info.Name(), sortInfos[j].info.Name()) > 0
}
default:
return nil, fmt.Errorf("Unexpected value for scan.order: %v", scanOrder)
}
default:
return nil, fmt.Errorf("Unexpected value for scan.sort: %v", scanSort)
}

if sortFunc != nil {
sort.Slice(sortInfos, sortFunc)
}

return sortInfos, nil
}

func getFileState(path string, info os.FileInfo, p *Prospector) (file.State, error) {
var err error
var absolutePath string
absolutePath, err = filepath.Abs(path)
if err != nil {
return file.State{}, fmt.Errorf("could not fetch abs path for file %s: %s", absolutePath, err)
}
logp.Debug("prospector", "Check file for harvesting: %s", absolutePath)
// Create new state for comparison
newState := file.NewState(info, absolutePath, p.config.Type)
return newState, nil
}

func getKeys(paths map[string]os.FileInfo) []string {
files := make([]string, 0)
for file := range paths {
files = append(files, file)
}
return files
}

// Scan starts a scanGlob for each provided path/glob
func (p *Prospector) scan() {

for path, info := range p.getFiles() {
var sortInfos []FileSortInfo
var files []string

paths := p.getFiles()

var err error

if p.config.ScanSort != "none" {
sortInfos, err = getSortedFiles(p.config.ScanOrder, p.config.ScanSort, getSortInfos(paths))
if err != nil {
logp.Err("Failed to sort files during scan due to error %s", err)
}
}

if sortInfos == nil {
files = getKeys(paths)
}

for i := 0; i < len(paths); i++ {

var path string
var info os.FileInfo

if sortInfos == nil {
path = files[i]
info = paths[path]
} else {
path = sortInfos[i].path
info = sortInfos[i].info
}

select {
case <-p.done:
Expand All @@ -276,15 +384,10 @@ func (p *Prospector) scan() {
default:
}

var err error
path, err = filepath.Abs(path)
newState, err := getFileState(path, info, p)
if err != nil {
logp.Err("could not fetch abs path for file %s: %s", path, err)
logp.Err("Skipping file %s due to error %s", path, err)
}
logp.Debug("prospector", "Check file for harvesting: %s", path)

// Create new state for comparison
newState := file.NewState(info, path, p.config.Type)

// Load last state
lastState := p.states.FindPrevious(newState)
Expand Down