Skip to content

Commit

Permalink
Scan older files to be harvested based on mod time (elastic#4374)
Browse files Browse the repository at this point in the history
  • Loading branch information
codeperfector committed Sep 5, 2017
1 parent 08c2542 commit c56d3b4
Show file tree
Hide file tree
Showing 4 changed files with 135 additions and 8 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ https://github.com/elastic/beats/compare/v6.0.0-alpha1...master[Check the HEAD d
- Add `add_docker_metadata` processor. {pull}4352[4352]

*Filebeat*
- Added ability to sort harvested files. {pull}4374[4374]
- Add experimental Redis slow log prospector type. {pull}4180[4180]

*Heartbeat*
Expand Down
19 changes: 19 additions & 0 deletions filebeat/docs/reference/configuration/filebeat-options.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,25 @@ If you require log lines to be sent in near real time do not use a very low `sca

The default setting is 10s.


[[scan-sort]]
===== scan.sort

Specifies if files should be harvested in order and how to determine the order. Possible values are modtime, filename and none. To sort by file modification time use modtime otherwise use filename.

If you specify a value other than none for this setting you can determine whether to use ascending or descending order using `scan.order`.

The default setting is none.


[[scan-order]]
===== scan.order

Specifies ascending or descending order if `scan.sort` is set to a value other than none. Possible values are asc or desc.

The default setting is asc.


===== harvester_buffer_size

The size in bytes of the buffer that each harvester uses when fetching a file. The default is 16384.
Expand Down
4 changes: 4 additions & 0 deletions filebeat/prospector/log/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ var (
HarvesterLimit: 0,
Symlinks: false,
TailFiles: false,
ScanSort: "none",
ScanOrder: "asc",

// Harvester
BufferSize: 16 * humanize.KiByte,
Expand Down Expand Up @@ -69,6 +71,8 @@ type config struct {
// Harvester
BufferSize int `config:"harvester_buffer_size"`
Encoding string `config:"encoding"`
ScanOrder string `config:"scan.order"`
ScanSort string `config:"scan.sort"`

ExcludeLines []match.Matcher `config:"exclude_lines"`
IncludeLines []match.Matcher `config:"include_lines"`
Expand Down
119 changes: 111 additions & 8 deletions filebeat/prospector/log/prospector.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"fmt"
"os"
"path/filepath"
"sort"
"strings"
"time"

"github.com/elastic/beats/filebeat/channel"
Expand Down Expand Up @@ -264,10 +266,116 @@ func (p *Prospector) matchesFile(filePath string) bool {
return false
}

type FileSortInfo struct {
info os.FileInfo
path string
}

func getSortInfos(paths map[string]os.FileInfo) []FileSortInfo {

sortInfos := make([]FileSortInfo, 0, len(paths))
for path, info := range paths {
sortInfo := FileSortInfo{info: info, path: path}
sortInfos = append(sortInfos, sortInfo)
}

return sortInfos
}

func getSortedFiles(scanOrder string, scanSort string, sortInfos []FileSortInfo) ([]FileSortInfo, error) {
var sortFunc func(i, j int) bool
switch scanSort {
case "modtime":
switch scanOrder {
case "asc":
sortFunc = func(i, j int) bool {
return sortInfos[i].info.ModTime().Before(sortInfos[j].info.ModTime())
}
case "desc":
sortFunc = func(i, j int) bool {
return sortInfos[i].info.ModTime().After(sortInfos[j].info.ModTime())
}
default:
return nil, fmt.Errorf("Unexpected value for scan.order: %v", scanOrder)
}
case "filename":
switch scanOrder {
case "asc":
sortFunc = func(i, j int) bool {
return strings.Compare(sortInfos[i].info.Name(), sortInfos[j].info.Name()) < 0
}
case "desc":
sortFunc = func(i, j int) bool {
return strings.Compare(sortInfos[i].info.Name(), sortInfos[j].info.Name()) > 0
}
default:
return nil, fmt.Errorf("Unexpected value for scan.order: %v", scanOrder)
}
default:
return nil, fmt.Errorf("Unexpected value for scan.sort: %v", scanSort)
}

if sortFunc != nil {
sort.Slice(sortInfos, sortFunc)
}

return sortInfos, nil
}

func getFileState(path string, info os.FileInfo, p *Prospector) (file.State, error) {
var err error
var absolutePath string
absolutePath, err = filepath.Abs(path)
if err != nil {
return file.State{}, fmt.Errorf("could not fetch abs path for file %s: %s", absolutePath, err)
}
logp.Debug("prospector", "Check file for harvesting: %s", absolutePath)
// Create new state for comparison
newState := file.NewState(info, absolutePath, p.config.Type)
return newState, nil
}

func getKeys(paths map[string]os.FileInfo) []string {
files := make([]string, 0)
for file := range paths {
files = append(files, file)
}
return files
}

// Scan starts a scanGlob for each provided path/glob
func (p *Prospector) scan() {

for path, info := range p.getFiles() {
var sortInfos []FileSortInfo
var files []string

paths := p.getFiles()

var err error

if p.config.ScanSort != "none" {
sortInfos, err = getSortedFiles(p.config.ScanOrder, p.config.ScanSort, getSortInfos(paths))
if err != nil {
logp.Err("Failed to sort files during scan due to error %s", err)
}
}

if sortInfos == nil {
files = getKeys(paths)
}

for i := 0; i < len(paths); i++ {

var path string
var info os.FileInfo

if sortInfos == nil {
path = files[i]
info = paths[path]
} else {
path = sortInfos[i].path
info = sortInfos[i].info
}

select {
case <-p.done:
Expand All @@ -276,15 +384,10 @@ func (p *Prospector) scan() {
default:
}

var err error
path, err = filepath.Abs(path)
newState, err := getFileState(path, info, p)
if err != nil {
logp.Err("could not fetch abs path for file %s: %s", path, err)
logp.Err("Skipping file %s due to error %s", path, err)
}
logp.Debug("prospector", "Check file for harvesting: %s", path)

// Create new state for comparison
newState := file.NewState(info, path, p.config.Type)

// Load last state
lastState := p.states.FindPrevious(newState)
Expand Down

0 comments on commit c56d3b4

Please sign in to comment.