From 0664d1b3e0b2296aecd6ac3cbd90990acbe6fdb1 Mon Sep 17 00:00:00 2001 From: Wayne Menezes Date: Wed, 3 May 2017 20:40:04 -0400 Subject: [PATCH 1/9] scan older files to be harvested based on mod time --- filebeat/prospector/log/config.go | 5 ++-- filebeat/prospector/log/prospector.go | 41 +++++++++++++++++++++++---- 2 files changed, 39 insertions(+), 7 deletions(-) diff --git a/filebeat/prospector/log/config.go b/filebeat/prospector/log/config.go index 913c0142b409..0c1690813fd9 100644 --- a/filebeat/prospector/log/config.go +++ b/filebeat/prospector/log/config.go @@ -67,8 +67,9 @@ type config struct { RecursiveGlob bool `config:"recursive_glob.enabled"` // Harvester - BufferSize int `config:"harvester_buffer_size"` - Encoding string `config:"encoding"` + BufferSize int `config:"harvester_buffer_size"` + Encoding string `config:"encoding"` + HarvesterScanOlder bool `config:"harvester_scan_older"` ExcludeLines []match.Matcher `config:"exclude_lines"` IncludeLines []match.Matcher `config:"include_lines"` diff --git a/filebeat/prospector/log/prospector.go b/filebeat/prospector/log/prospector.go index 2c8d5647d2d6..b6aa04737e7c 100644 --- a/filebeat/prospector/log/prospector.go +++ b/filebeat/prospector/log/prospector.go @@ -14,6 +14,7 @@ import ( "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/monitoring" + "sort" ) const ( @@ -264,10 +265,40 @@ func (p *Prospector) matchesFile(filePath string) bool { return false } +type FileSortInfo struct { + info os.FileInfo + path string +} + +func getSortInfos(paths map[string]os.FileInfo) []FileSortInfo { + + sortInfos := make([]FileSortInfo, 0, len(paths)) + for path, info := range paths { + sortInfo := FileSortInfo{info: info, path: path} + sortInfos = append(sortInfos, sortInfo) + } + + return sortInfos; +} + +func getSortedFiles(sortInfos []FileSortInfo) []FileSortInfo { + + sort.Slice(sortInfos, func(i, j int) bool { + return sortInfos[i].info.ModTime().Before(sortInfos[j].info.ModTime()); + }) + + return sortInfos; +} + // Scan starts a scanGlob for each provided path/glob func (p *Prospector) scan() { - for path, info := range p.getFiles() { + paths := p.getFiles(); + files := getSortInfos(paths) + if p.config.HarvesterScanOlder { + files = getSortedFiles(files) + } + for _,fileInfo := range files { select { case <-p.done: @@ -277,14 +308,14 @@ func (p *Prospector) scan() { } var err error - path, err = filepath.Abs(path) + fileInfo.path, err = filepath.Abs(fileInfo.path) if err != nil { - logp.Err("could not fetch abs path for file %s: %s", path, err) + logp.Err("could not fetch abs path for file %s: %s", fileInfo.path, err) } - logp.Debug("prospector", "Check file for harvesting: %s", path) + logp.Debug("prospector", "Check file for harvesting: %s", fileInfo.path) // Create new state for comparison - newState := file.NewState(info, path, p.config.Type) + newState := file.NewState(fileInfo.info, fileInfo.path, p.config.Type) // Load last state lastState := p.states.FindPrevious(newState) From be36fdb000498ad631e6adf87e41e3bb7f9a4741 Mon Sep 17 00:00:00 2001 From: Wayne Menezes Date: Sun, 28 May 2017 16:37:07 -0400 Subject: [PATCH 2/9] scan_order to allow sorting of files by mod time or file name --- filebeat/prospector/log/config.go | 2 +- filebeat/prospector/log/prospector.go | 31 ++++++++++++++++++++------- 2 files changed, 24 insertions(+), 9 deletions(-) diff --git a/filebeat/prospector/log/config.go b/filebeat/prospector/log/config.go index 0c1690813fd9..a339426c99f4 100644 --- a/filebeat/prospector/log/config.go +++ b/filebeat/prospector/log/config.go @@ -69,7 +69,7 @@ type config struct { // Harvester BufferSize int `config:"harvester_buffer_size"` Encoding string `config:"encoding"` - HarvesterScanOlder bool `config:"harvester_scan_older"` + ScanOrder string `config:"scan_order"` ExcludeLines []match.Matcher `config:"exclude_lines"` IncludeLines []match.Matcher `config:"include_lines"` diff --git a/filebeat/prospector/log/prospector.go b/filebeat/prospector/log/prospector.go index b6aa04737e7c..2c8dc5644267 100644 --- a/filebeat/prospector/log/prospector.go +++ b/filebeat/prospector/log/prospector.go @@ -15,6 +15,7 @@ import ( "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/monitoring" "sort" + "strings" ) const ( @@ -281,23 +282,37 @@ func getSortInfos(paths map[string]os.FileInfo) []FileSortInfo { return sortInfos; } -func getSortedFiles(sortInfos []FileSortInfo) []FileSortInfo { - - sort.Slice(sortInfos, func(i, j int) bool { - return sortInfos[i].info.ModTime().Before(sortInfos[j].info.ModTime()); - }) +func getSortedFiles(sortOrder string, sortInfos []FileSortInfo) []FileSortInfo { + switch sortOrder { + case "mod_time_ascending": + sort.Slice(sortInfos, func(i, j int) bool { + return sortInfos[i].info.ModTime().Before(sortInfos[j].info.ModTime()); + }); + case "mod_time_descending": + sort.Slice(sortInfos, func(i, j int) bool { + return sortInfos[i].info.ModTime().After(sortInfos[j].info.ModTime()); + }); + case "file_name_ascending": + sort.Slice(sortInfos, func(i, j int) bool { + return strings.Compare(sortInfos[i].info.Name(), sortInfos[j].info.Name()) < 0 + }); + case "file_name_descending": + sort.Slice(sortInfos, func(i, j int) bool { + return strings.Compare(sortInfos[i].info.Name(), sortInfos[j].info.Name()) > 0 + }); + default: + } return sortInfos; } + // Scan starts a scanGlob for each provided path/glob func (p *Prospector) scan() { paths := p.getFiles(); files := getSortInfos(paths) - if p.config.HarvesterScanOlder { - files = getSortedFiles(files) - } + files = getSortedFiles(strings.ToLower(p.config.ScanOrder), files) for _,fileInfo := range files { select { From ee7cfeadfb1a12e75ec86961cf45918666ce2f6d Mon Sep 17 00:00:00 2001 From: Wayne Menezes Date: Mon, 29 May 2017 17:22:35 -0400 Subject: [PATCH 3/9] code review comments --- filebeat/prospector/log/config.go | 7 ++-- filebeat/prospector/log/prospector.go | 60 ++++++++++++++------------- 2 files changed, 35 insertions(+), 32 deletions(-) diff --git a/filebeat/prospector/log/config.go b/filebeat/prospector/log/config.go index a339426c99f4..2f2c475bdca9 100644 --- a/filebeat/prospector/log/config.go +++ b/filebeat/prospector/log/config.go @@ -67,9 +67,10 @@ type config struct { RecursiveGlob bool `config:"recursive_glob.enabled"` // Harvester - BufferSize int `config:"harvester_buffer_size"` - Encoding string `config:"encoding"` - ScanOrder string `config:"scan_order"` + BufferSize int `config:"harvester_buffer_size"` + Encoding string `config:"encoding"` + ScanOrder string `config:"scan.order"` + ScanSort string `config:"scan.sort"` ExcludeLines []match.Matcher `config:"exclude_lines"` IncludeLines []match.Matcher `config:"include_lines"` diff --git a/filebeat/prospector/log/prospector.go b/filebeat/prospector/log/prospector.go index 2c8dc5644267..7b2e1cf8f9c7 100644 --- a/filebeat/prospector/log/prospector.go +++ b/filebeat/prospector/log/prospector.go @@ -7,6 +7,9 @@ import ( "path/filepath" "time" + "sort" + "strings" + "github.com/elastic/beats/filebeat/channel" "github.com/elastic/beats/filebeat/harvester" "github.com/elastic/beats/filebeat/input/file" @@ -14,8 +17,6 @@ import ( "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/monitoring" - "sort" - "strings" ) const ( @@ -279,41 +280,42 @@ func getSortInfos(paths map[string]os.FileInfo) []FileSortInfo { sortInfos = append(sortInfos, sortInfo) } - return sortInfos; + return sortInfos } -func getSortedFiles(sortOrder string, sortInfos []FileSortInfo) []FileSortInfo { - switch sortOrder { - case "mod_time_ascending": - sort.Slice(sortInfos, func(i, j int) bool { - return sortInfos[i].info.ModTime().Before(sortInfos[j].info.ModTime()); - }); - case "mod_time_descending": - sort.Slice(sortInfos, func(i, j int) bool { - return sortInfos[i].info.ModTime().After(sortInfos[j].info.ModTime()); - }); - case "file_name_ascending": - sort.Slice(sortInfos, func(i, j int) bool { - return strings.Compare(sortInfos[i].info.Name(), sortInfos[j].info.Name()) < 0 - }); - case "file_name_descending": - sort.Slice(sortInfos, func(i, j int) bool { - return strings.Compare(sortInfos[i].info.Name(), sortInfos[j].info.Name()) > 0 - }); - default: - } - - return sortInfos; +func getSortedFiles(scanOrder string, scanSort string, sortInfos []FileSortInfo) []FileSortInfo { + switch scanOrder + "_" + scanSort { + case "asc_modtime": + sort.Slice(sortInfos, func(i, j int) bool { + return sortInfos[i].info.ModTime().Before(sortInfos[j].info.ModTime()) + }) + case "desc_modtime": + sort.Slice(sortInfos, func(i, j int) bool { + return sortInfos[i].info.ModTime().After(sortInfos[j].info.ModTime()) + }) + case "asc_filename": + sort.Slice(sortInfos, func(i, j int) bool { + return strings.Compare(sortInfos[i].info.Name(), sortInfos[j].info.Name()) < 0 + }) + case "desc_filename": + sort.Slice(sortInfos, func(i, j int) bool { + return strings.Compare(sortInfos[i].info.Name(), sortInfos[j].info.Name()) > 0 + }) + default: + } + + return sortInfos } - // Scan starts a scanGlob for each provided path/glob func (p *Prospector) scan() { - paths := p.getFiles(); + paths := p.getFiles() files := getSortInfos(paths) - files = getSortedFiles(strings.ToLower(p.config.ScanOrder), files) - for _,fileInfo := range files { + files = getSortedFiles(strings.ToLower(p.config.ScanOrder), + strings.ToLower(p.config.ScanSort), + files) + for _, fileInfo := range files { select { case <-p.done: From 46e4ac2427657601c4f982fb6a5ef311c97d221e Mon Sep 17 00:00:00 2001 From: Wayne Menezes Date: Thu, 1 Jun 2017 18:50:04 -0400 Subject: [PATCH 4/9] added new properties for scan.sort, scan.order; code review suggestions --- CHANGELOG.asciidoc | 1 + .../configuration/filebeat-options.asciidoc | 19 ++++ filebeat/filebeat.full.yml | 8 ++ filebeat/prospector/log/config.go | 2 + filebeat/prospector/log/prospector.go | 107 ++++++++++++------ 5 files changed, 105 insertions(+), 32 deletions(-) diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index b7f60fee3880..4fbb93bbc2d8 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -58,6 +58,7 @@ https://github.com/elastic/beats/compare/v6.0.0-alpha1...master[Check the HEAD d - Add the option to write the generated Elasticsearch mapping template into a file. {pull}4323[4323] *Filebeat* +- Added ability to sort harvested files. {pull}4374[4374] *Heartbeat* diff --git a/filebeat/docs/reference/configuration/filebeat-options.asciidoc b/filebeat/docs/reference/configuration/filebeat-options.asciidoc index 0014000a3b28..40738fe3974f 100644 --- a/filebeat/docs/reference/configuration/filebeat-options.asciidoc +++ b/filebeat/docs/reference/configuration/filebeat-options.asciidoc @@ -301,6 +301,25 @@ If you require log lines to be sent in near real time do not use a very low `sca The default setting is 10s. + +[[scan-sort]] +===== scan.sort + +Specifies if files should be harvested in order and how to determine the order. Possible values are modtime, filename and none. To sort by file modification time use modtime otherwise use filename. + +If you specify a value other than none for this setting you can determine whether to use ascending or descending order using `scan.order` + +The default setting is none + + +[[scan-order]] +===== scan.order + +Specifies ascending or descending order if `scan.sort` is set to a value other than none. Possible values are asc or desc + +The default setting is asc + + ===== harvester_buffer_size The size in bytes of the buffer that each harvester uses when fetching a file. The default is 16384. diff --git a/filebeat/filebeat.full.yml b/filebeat/filebeat.full.yml index c2c255bb1d00..580cb55d281a 100644 --- a/filebeat/filebeat.full.yml +++ b/filebeat/filebeat.full.yml @@ -241,6 +241,14 @@ filebeat.prospectors: # without causing Filebeat to scan too frequently. Default: 10s. #scan_frequency: 10s + # Specifies if files should be harvested in order and how to determine the order. + # Possible values are modtime, filename and none + #scan.sort: modtime + + # Specifies ascending or descending order if scan.sort is set + # Possible values are asc or desc + #scan.order: asc + # Defines the buffer size every harvester uses when fetching the file #harvester_buffer_size: 16384 diff --git a/filebeat/prospector/log/config.go b/filebeat/prospector/log/config.go index 2f2c475bdca9..0a51e724a690 100644 --- a/filebeat/prospector/log/config.go +++ b/filebeat/prospector/log/config.go @@ -29,6 +29,8 @@ var ( HarvesterLimit: 0, Symlinks: false, TailFiles: false, + ScanSort: "none", + ScanOrder: "asc", // Harvester BufferSize: 16 * humanize.KiByte, diff --git a/filebeat/prospector/log/prospector.go b/filebeat/prospector/log/prospector.go index 7b2e1cf8f9c7..2ff9047337ce 100644 --- a/filebeat/prospector/log/prospector.go +++ b/filebeat/prospector/log/prospector.go @@ -6,7 +6,6 @@ import ( "os" "path/filepath" "time" - "sort" "strings" @@ -284,38 +283,90 @@ func getSortInfos(paths map[string]os.FileInfo) []FileSortInfo { } func getSortedFiles(scanOrder string, scanSort string, sortInfos []FileSortInfo) []FileSortInfo { - switch scanOrder + "_" + scanSort { - case "asc_modtime": - sort.Slice(sortInfos, func(i, j int) bool { - return sortInfos[i].info.ModTime().Before(sortInfos[j].info.ModTime()) - }) - case "desc_modtime": - sort.Slice(sortInfos, func(i, j int) bool { - return sortInfos[i].info.ModTime().After(sortInfos[j].info.ModTime()) - }) - case "asc_filename": - sort.Slice(sortInfos, func(i, j int) bool { - return strings.Compare(sortInfos[i].info.Name(), sortInfos[j].info.Name()) < 0 - }) - case "desc_filename": - sort.Slice(sortInfos, func(i, j int) bool { - return strings.Compare(sortInfos[i].info.Name(), sortInfos[j].info.Name()) > 0 - }) + var sortFunc func(i, j int) bool = nil + switch scanSort { + case "modtime": + switch scanOrder { + case "asc": + sortFunc = func(i, j int) bool { + return sortInfos[i].info.ModTime().Before(sortInfos[j].info.ModTime()) + } + case "desc": + sortFunc = func(i, j int) bool { + return sortInfos[i].info.ModTime().After(sortInfos[j].info.ModTime()) + } + default: + } + case "filename": + switch scanOrder { + case "asc": + sortFunc = func(i, j int) bool { + return strings.Compare(sortInfos[i].info.Name(), sortInfos[j].info.Name()) < 0 + } + case "desc": + sortFunc = func(i, j int) bool { + return strings.Compare(sortInfos[i].info.Name(), sortInfos[j].info.Name()) > 0 + } + default: + } default: } + if sortFunc != nil { + sort.Slice(sortInfos, sortFunc) + } + return sortInfos } +func getFileState(path string, info os.FileInfo, p *Prospector) (file.State) { + var err error + var absolutePath string + absolutePath, err = filepath.Abs(path) + if err != nil { + logp.Err("could not fetch abs path for file %s: %s", absolutePath, err) + } + logp.Debug("prospector", "Check file for harvesting: %s", absolutePath) + // Create new state for comparison + newState := file.NewState(info, absolutePath, p.config.Type) + return newState +} + +func getKeys(paths map[string]os.FileInfo) []string { + files := make([]string, 0) + for file := range paths { + files = append(files, file) + } + return files +} + // Scan starts a scanGlob for each provided path/glob func (p *Prospector) scan() { + var sortInfos []FileSortInfo + var files []string + paths := p.getFiles() - files := getSortInfos(paths) - files = getSortedFiles(strings.ToLower(p.config.ScanOrder), - strings.ToLower(p.config.ScanSort), - files) - for _, fileInfo := range files { + if strings.ToLower(p.config.ScanSort) != "none" { + sortInfos = getSortedFiles(strings.ToLower(p.config.ScanOrder), + strings.ToLower(p.config.ScanSort), + getSortInfos(paths)) + } else { + files = getKeys(paths) + } + + for i := 0; i < len(paths); i++ { + + var path string + var info os.FileInfo + + if strings.ToLower(p.config.ScanSort) != "none" { + path = sortInfos[i].path + info = sortInfos[i].info + } else { + path = files[i] + info = paths[path] + } select { case <-p.done: @@ -324,15 +375,7 @@ func (p *Prospector) scan() { default: } - var err error - fileInfo.path, err = filepath.Abs(fileInfo.path) - if err != nil { - logp.Err("could not fetch abs path for file %s: %s", fileInfo.path, err) - } - logp.Debug("prospector", "Check file for harvesting: %s", fileInfo.path) - - // Create new state for comparison - newState := file.NewState(fileInfo.info, fileInfo.path, p.config.Type) + newState := getFileState(path, info, p) // Load last state lastState := p.states.FindPrevious(newState) From e7e615bc3ba3303cea222d518a6833cc321bdcb1 Mon Sep 17 00:00:00 2001 From: Wayne Menezes Date: Mon, 5 Jun 2017 11:46:44 -0400 Subject: [PATCH 5/9] correct goimports style --- filebeat/prospector/log/prospector.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/filebeat/prospector/log/prospector.go b/filebeat/prospector/log/prospector.go index 2ff9047337ce..8fb380bd2f47 100644 --- a/filebeat/prospector/log/prospector.go +++ b/filebeat/prospector/log/prospector.go @@ -5,9 +5,9 @@ import ( "fmt" "os" "path/filepath" - "time" "sort" "strings" + "time" "github.com/elastic/beats/filebeat/channel" "github.com/elastic/beats/filebeat/harvester" @@ -319,7 +319,7 @@ func getSortedFiles(scanOrder string, scanSort string, sortInfos []FileSortInfo) return sortInfos } -func getFileState(path string, info os.FileInfo, p *Prospector) (file.State) { +func getFileState(path string, info os.FileInfo, p *Prospector) file.State { var err error var absolutePath string absolutePath, err = filepath.Abs(path) From 4c23c367efc983e600a75e22bb47cd45954499c8 Mon Sep 17 00:00:00 2001 From: Wayne Menezes Date: Sat, 10 Jun 2017 16:31:49 -0400 Subject: [PATCH 6/9] fix filebeat.full.yml --- filebeat/filebeat.full.yml | 8 -------- 1 file changed, 8 deletions(-) diff --git a/filebeat/filebeat.full.yml b/filebeat/filebeat.full.yml index 7545124de47d..2f85dcfd1243 100644 --- a/filebeat/filebeat.full.yml +++ b/filebeat/filebeat.full.yml @@ -241,14 +241,6 @@ filebeat.prospectors: # without causing Filebeat to scan too frequently. Default: 10s. #scan_frequency: 10s - # Specifies if files should be harvested in order and how to determine the order. - # Possible values are modtime, filename and none - #scan.sort: modtime - - # Specifies ascending or descending order if scan.sort is set - # Possible values are asc or desc - #scan.order: asc - # Defines the buffer size every harvester uses when fetching the file #harvester_buffer_size: 16384 From 92ff5a0d3a0b53de727db4e51af2af94c4852afc Mon Sep 17 00:00:00 2001 From: Wayne Menezes Date: Tue, 13 Jun 2017 21:04:26 -0400 Subject: [PATCH 7/9] code review fixes --- CHANGELOG.asciidoc | 2 -- .../configuration/filebeat-options.asciidoc | 8 ++--- filebeat/prospector/log/prospector.go | 34 +++++++++++++------ 3 files changed, 27 insertions(+), 17 deletions(-) diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index be50ab73d525..3e1437f27a85 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -135,8 +135,6 @@ https://github.com/elastic/beats/compare/v6.0.0-alpha1...v6.0.0-alpha2[View comm - Added ability to sort harvested files. {pull}4374[4374] - Add experimental Redis slow log prospector type. {pull}4180[4180] -- Add experimental Redis slow log prospector type. {pull}4180[4180] - *Metricbeat* - Add macOS implementation of the system diskio metricset. {issue}4144[4144] diff --git a/filebeat/docs/reference/configuration/filebeat-options.asciidoc b/filebeat/docs/reference/configuration/filebeat-options.asciidoc index 15c1e177f34d..9e6c7ecb4eba 100644 --- a/filebeat/docs/reference/configuration/filebeat-options.asciidoc +++ b/filebeat/docs/reference/configuration/filebeat-options.asciidoc @@ -308,17 +308,17 @@ The default setting is 10s. Specifies if files should be harvested in order and how to determine the order. Possible values are modtime, filename and none. To sort by file modification time use modtime otherwise use filename. -If you specify a value other than none for this setting you can determine whether to use ascending or descending order using `scan.order` +If you specify a value other than none for this setting you can determine whether to use ascending or descending order using `scan.order`. -The default setting is none +The default setting is none. [[scan-order]] ===== scan.order -Specifies ascending or descending order if `scan.sort` is set to a value other than none. Possible values are asc or desc +Specifies ascending or descending order if `scan.sort` is set to a value other than none. Possible values are asc or desc. -The default setting is asc +The default setting is asc. ===== harvester_buffer_size diff --git a/filebeat/prospector/log/prospector.go b/filebeat/prospector/log/prospector.go index 8fb380bd2f47..c640912f960f 100644 --- a/filebeat/prospector/log/prospector.go +++ b/filebeat/prospector/log/prospector.go @@ -282,7 +282,7 @@ func getSortInfos(paths map[string]os.FileInfo) []FileSortInfo { return sortInfos } -func getSortedFiles(scanOrder string, scanSort string, sortInfos []FileSortInfo) []FileSortInfo { +func getSortedFiles(scanOrder string, scanSort string, sortInfos []FileSortInfo) ([]FileSortInfo, string) { var sortFunc func(i, j int) bool = nil switch scanSort { case "modtime": @@ -296,6 +296,7 @@ func getSortedFiles(scanOrder string, scanSort string, sortInfos []FileSortInfo) return sortInfos[i].info.ModTime().After(sortInfos[j].info.ModTime()) } default: + return nil, "Unexpected value for scan.order: " + scanOrder } case "filename": switch scanOrder { @@ -308,15 +309,17 @@ func getSortedFiles(scanOrder string, scanSort string, sortInfos []FileSortInfo) return strings.Compare(sortInfos[i].info.Name(), sortInfos[j].info.Name()) > 0 } default: + return nil, "Unexpected value for scan.order: " + scanOrder } default: + return nil, "Unexpected value for scan.sort: " + scanSort } if sortFunc != nil { sort.Slice(sortInfos, sortFunc) } - return sortInfos + return sortInfos, "" } func getFileState(path string, info os.FileInfo, p *Prospector) file.State { @@ -343,15 +346,24 @@ func getKeys(paths map[string]os.FileInfo) []string { // Scan starts a scanGlob for each provided path/glob func (p *Prospector) scan() { - var sortInfos []FileSortInfo - var files []string + var sortInfos []FileSortInfo = nil + var files []string = nil paths := p.getFiles() - if strings.ToLower(p.config.ScanSort) != "none" { - sortInfos = getSortedFiles(strings.ToLower(p.config.ScanOrder), + + var err string = "" + + if p.config.ScanSort != "none" { + sortInfos, err = getSortedFiles(strings.ToLower(p.config.ScanOrder), strings.ToLower(p.config.ScanSort), getSortInfos(paths)) - } else { + } + + if err != "" { + logp.Err(err) + } + + if sortInfos == nil { files = getKeys(paths) } @@ -360,12 +372,12 @@ func (p *Prospector) scan() { var path string var info os.FileInfo - if strings.ToLower(p.config.ScanSort) != "none" { - path = sortInfos[i].path - info = sortInfos[i].info - } else { + if sortInfos == nil { path = files[i] info = paths[path] + } else { + path = sortInfos[i].path + info = sortInfos[i].info } select { From fe3eda0d8c61f4b210565bb6fc349bb59f40d9d8 Mon Sep 17 00:00:00 2001 From: Wayne Menezes Date: Tue, 20 Jun 2017 22:15:10 -0400 Subject: [PATCH 8/9] error handling --- filebeat/prospector/log/prospector.go | 36 ++++++++++++++------------- 1 file changed, 19 insertions(+), 17 deletions(-) diff --git a/filebeat/prospector/log/prospector.go b/filebeat/prospector/log/prospector.go index c640912f960f..a9965d0a6625 100644 --- a/filebeat/prospector/log/prospector.go +++ b/filebeat/prospector/log/prospector.go @@ -282,8 +282,8 @@ func getSortInfos(paths map[string]os.FileInfo) []FileSortInfo { return sortInfos } -func getSortedFiles(scanOrder string, scanSort string, sortInfos []FileSortInfo) ([]FileSortInfo, string) { - var sortFunc func(i, j int) bool = nil +func getSortedFiles(scanOrder string, scanSort string, sortInfos []FileSortInfo) ([]FileSortInfo, error) { + var sortFunc func(i, j int) bool switch scanSort { case "modtime": switch scanOrder { @@ -296,7 +296,7 @@ func getSortedFiles(scanOrder string, scanSort string, sortInfos []FileSortInfo) return sortInfos[i].info.ModTime().After(sortInfos[j].info.ModTime()) } default: - return nil, "Unexpected value for scan.order: " + scanOrder + return nil, fmt.Errorf("Unexpected value for scan.order: %v", scanOrder) } case "filename": switch scanOrder { @@ -309,30 +309,30 @@ func getSortedFiles(scanOrder string, scanSort string, sortInfos []FileSortInfo) return strings.Compare(sortInfos[i].info.Name(), sortInfos[j].info.Name()) > 0 } default: - return nil, "Unexpected value for scan.order: " + scanOrder + return nil, fmt.Errorf("Unexpected value for scan.order: %v", scanOrder) } default: - return nil, "Unexpected value for scan.sort: " + scanSort + return nil, fmt.Errorf("Unexpected value for scan.sort: %v", scanSort) } if sortFunc != nil { sort.Slice(sortInfos, sortFunc) } - return sortInfos, "" + return sortInfos, nil } -func getFileState(path string, info os.FileInfo, p *Prospector) file.State { +func getFileState(path string, info os.FileInfo, p *Prospector) (file.State, error) { var err error var absolutePath string absolutePath, err = filepath.Abs(path) if err != nil { - logp.Err("could not fetch abs path for file %s: %s", absolutePath, err) + return file.State{}, fmt.Errorf("could not fetch abs path for file %s: %s", absolutePath, err) } logp.Debug("prospector", "Check file for harvesting: %s", absolutePath) // Create new state for comparison newState := file.NewState(info, absolutePath, p.config.Type) - return newState + return newState, nil } func getKeys(paths map[string]os.FileInfo) []string { @@ -346,21 +346,20 @@ func getKeys(paths map[string]os.FileInfo) []string { // Scan starts a scanGlob for each provided path/glob func (p *Prospector) scan() { - var sortInfos []FileSortInfo = nil - var files []string = nil + var sortInfos []FileSortInfo + var files []string paths := p.getFiles() - var err string = "" + var err error if p.config.ScanSort != "none" { sortInfos, err = getSortedFiles(strings.ToLower(p.config.ScanOrder), strings.ToLower(p.config.ScanSort), getSortInfos(paths)) - } - - if err != "" { - logp.Err(err) + if err != nil { + logp.Err("Failed to sort files during scan due to error %s", err) + } } if sortInfos == nil { @@ -387,7 +386,10 @@ func (p *Prospector) scan() { default: } - newState := getFileState(path, info, p) + newState, err := getFileState(path, info, p) + if err != nil { + logp.Err("Skipping file %s due to error %s", path, err) + } // Load last state lastState := p.states.FindPrevious(newState) From 23e16a1a5ecbd3cb985662b9ff4f388b69f07073 Mon Sep 17 00:00:00 2001 From: Wayne Menezes Date: Thu, 22 Jun 2017 17:24:22 -0400 Subject: [PATCH 9/9] code review fix --- filebeat/prospector/log/prospector.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/filebeat/prospector/log/prospector.go b/filebeat/prospector/log/prospector.go index a9965d0a6625..9d7a84825eec 100644 --- a/filebeat/prospector/log/prospector.go +++ b/filebeat/prospector/log/prospector.go @@ -354,9 +354,7 @@ func (p *Prospector) scan() { var err error if p.config.ScanSort != "none" { - sortInfos, err = getSortedFiles(strings.ToLower(p.config.ScanOrder), - strings.ToLower(p.config.ScanSort), - getSortInfos(paths)) + sortInfos, err = getSortedFiles(p.config.ScanOrder, p.config.ScanSort, getSortInfos(paths)) if err != nil { logp.Err("Failed to sort files during scan due to error %s", err) }