Skip to content

Commit

Permalink
Merge pull request elastic#5 from tsg/fix_logging_msgs
Browse files Browse the repository at this point in the history
Fixed calls to logp.Info
  • Loading branch information
ruflin committed Sep 11, 2015
2 parents ebb781f + 7c537b8 commit cf34826
Show file tree
Hide file tree
Showing 8 changed files with 68 additions and 61 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
filebeat
.DS_Store
.filebeat
*.swp
*.swo

cover
profile.cov
Expand Down
1 change: 0 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ run: build
.PHONY: test
test:
$(GODEP) go test -short ./...
make -C tests test

.PHONY: cover
cover:
Expand Down
23 changes: 12 additions & 11 deletions beat/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,16 @@ package beat
import (
"flag"
"fmt"
"os"
"time"

cfg "github.com/elastic/filebeat/config"
. "github.com/elastic/filebeat/crawler"
. "github.com/elastic/filebeat/input"
"github.com/elastic/libbeat/beat"
"github.com/elastic/libbeat/cfgfile"
"github.com/elastic/libbeat/common"
"github.com/elastic/libbeat/logp"
"os"
"time"
"github.com/elastic/libbeat/cfgfile"
)

var exitStat = struct {
Expand Down Expand Up @@ -111,14 +112,14 @@ func (fb *Filebeat) Stop() {

// emitOptions prints out the set config options
func emitOptions() {
logp.Info("filebeat", "\t--- options -------")
logp.Info("filebeat", "\tconfig-arg: %s", configDirPath)
logp.Info("filebeat", "\tidle-timeout: %v", cfg.CmdlineOptions.IdleTimeout)
logp.Info("filebeat", "\tspool-size: %d", cfg.CmdlineOptions.SpoolSize)
logp.Info("filebeat", "\tharvester-buff-size: %d", cfg.CmdlineOptions.HarvesterBufferSize)
logp.Info("filebeat", "\t--- flags ---------")
logp.Info("filebeat", "\ttail (on-rotation): %t", cfg.CmdlineOptions.TailOnRotate)
logp.Info("filebeat", "\tquiet: %t", cfg.CmdlineOptions.Quiet)
logp.Info("\t--- options -------")
logp.Info("\tconfig-arg: %s", configDirPath)
logp.Info("\tidle-timeout: %v", cfg.CmdlineOptions.IdleTimeout)
logp.Info("\tspool-size: %d", cfg.CmdlineOptions.SpoolSize)
logp.Info("\tharvester-buff-size: %d", cfg.CmdlineOptions.HarvesterBufferSize)
logp.Info("\t--- flags ---------")
logp.Info("\ttail (on-rotation): %t", cfg.CmdlineOptions.TailOnRotate)
logp.Info("\tquiet: %t", cfg.CmdlineOptions.Quiet)
}

func Publish(beat *beat.Beat, fb *Filebeat) {
Expand Down
13 changes: 7 additions & 6 deletions beat/registrar.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,16 @@ package beat

import (
"encoding/json"
"os"

. "github.com/elastic/filebeat/input"
"github.com/elastic/libbeat/logp"
"os"
)

func Registrar(state map[string]*FileState, input chan []*FileEvent) {
logp.Info("registrar", "Starting Registrar")
logp.Debug("registrar", "Starting Registrar")
for events := range input {
logp.Info("registrar", "Registrar: processing %d events", len(events))
logp.Debug("registrar", "Registrar: processing %d events", len(events))
// Take the last event found for each file source
for _, event := range events {
// skip stdin
Expand All @@ -23,18 +24,18 @@ func Registrar(state map[string]*FileState, input chan []*FileEvent) {

if e := writeRegistry(state, ".filebeat"); e != nil {
// REVU: but we should panic, or something, right?
logp.Info("registrar", "WARNING: (continuing) update of registry returned error: %s", e)
logp.Warn("WARNING: (continuing) update of registry returned error: %s", e)
}
}
logp.Info("registrar", "Ending Registrar")
logp.Debug("registrar", "Ending Registrar")
}

// writeRegistry Writes the new json registry file to disk
func writeRegistry(state map[string]*FileState, path string) error {
tempfile := path + ".new"
file, e := os.Create(tempfile)
if e != nil {
logp.Info("registrar", "Failed to create tempfile (%s) for writing: %s", tempfile, e)
logp.Err("Failed to create tempfile (%s) for writing: %s", tempfile, e)
return e
}
defer file.Close()
Expand Down
23 changes: 12 additions & 11 deletions crawler/harvester.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@ import (
"bufio"
"bytes"
"fmt"
cfg "github.com/elastic/filebeat/config"
. "github.com/elastic/filebeat/input"
"github.com/elastic/libbeat/logp"
"io"
"os"
"time"

cfg "github.com/elastic/filebeat/config"
. "github.com/elastic/filebeat/input"
"github.com/elastic/libbeat/logp"
)

type Harvester struct {
Expand Down Expand Up @@ -53,18 +54,18 @@ func (h *Harvester) Harvest(output chan *FileEvent) {
// Check to see if the file was truncated
info, _ := h.file.Stat()
if info.Size() < h.Offset {
logp.Info("harvester", "File truncated, seeking to beginning: %s", h.Path)
logp.Debug("harvester", "File truncated, seeking to beginning: %s", h.Path)
h.file.Seek(0, os.SEEK_SET)
h.Offset = 0
} else if age := time.Since(last_read_time); age > h.FileConfig.DeadtimeSpan {
// if last_read_time was more than dead time, this file is probably
// dead. Stop watching it.
logp.Info("harvester", "Stopping harvest of ", h.Path, "last change was: ", age)
logp.Debug("harvester", "Stopping harvest of ", h.Path, "last change was: ", age)
return
}
continue
} else {
logp.Info("harvester", "Unexpected state reading from %s; error: %s", h.Path, err)
logp.Err("Unexpected state reading from %s; error: %s", h.Path, err)
return
}
}
Expand All @@ -91,11 +92,11 @@ func (h *Harvester) initOffset() {
offset, _ := h.file.Seek(0, os.SEEK_CUR)

if h.Offset > 0 {
logp.Info("harvester", "harvest: %q position:%d (offset snapshot:%d)", h.Path, h.Offset, offset)
logp.Debug("harvester", "harvest: %q position:%d (offset snapshot:%d)", h.Path, h.Offset, offset)
} else if cfg.CmdlineOptions.TailOnRotate {
logp.Info("harvester", "harvest: (tailing) %q (offset snapshot:%d)", h.Path, offset)
logp.Debug("harvester", "harvest: (tailing) %q (offset snapshot:%d)", h.Path, offset)
} else {
logp.Info("harvester", "harvest: %q (offset snapshot:%d)", h.Path, offset)
logp.Debug("harvester", "harvest: %q (offset snapshot:%d)", h.Path, offset)
}

h.Offset = offset
Expand Down Expand Up @@ -126,7 +127,7 @@ func (h *Harvester) open() *os.File {

if err != nil {
// retry on failure.
logp.Info("harvester", "Failed opening %s: %s", h.Path, err)
logp.Err("Failed opening %s: %s", h.Path, err)
time.Sleep(5 * time.Second)
} else {
break
Expand Down Expand Up @@ -181,7 +182,7 @@ func (h *Harvester) readline(reader *bufio.Reader, buffer *bytes.Buffer, eof_tim
}
continue
} else {
logp.Info("harvester", "error: Harvester.readLine: %s", err.Error())
logp.Err("Harvester.readLine: %s", err.Error())
return nil, 0, err // TODO(sissel): don't do this?
}
}
Expand Down
55 changes: 28 additions & 27 deletions crawler/prospector.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@ package crawler

import (
"encoding/json"
cfg "github.com/elastic/filebeat/config"
. "github.com/elastic/filebeat/input"
"github.com/elastic/libbeat/logp"
"os"
"path/filepath"
"time"

cfg "github.com/elastic/filebeat/config"
. "github.com/elastic/filebeat/input"
"github.com/elastic/libbeat/logp"
)

// Last reading state of the prospector
Expand Down Expand Up @@ -42,9 +43,9 @@ func (restart *ProspectorResume) LoadState() {
defer existing.Close()
wd := ""
if wd, e = os.Getwd(); e != nil {
logp.Info("filebeat", "WARNING: os.Getwd retuned unexpected error %s -- ignoring", e.Error())
logp.Warn("WARNING: os.Getwd retuned unexpected error %s -- ignoring", e.Error())
}
logp.Info("filebeat", "Loading registrar data from %s/.filebeat", wd)
logp.Info("Loading registrar data from %s/.filebeat", wd)

decoder := json.NewDecoder(existing)
decoder.Decode(&restart.Files)
Expand All @@ -64,7 +65,7 @@ func (restart *ProspectorResume) Scan(files []cfg.FileConfig, persist map[string

// Now determine which states we need to persist by pulling the events from the prospectors
// When we hit a nil source a prospector had finished so we decrease the expected events
logp.Info("filebeat", "Waiting for %d prospectors to initialise", pendingProspectorCnt)
logp.Debug("prospector", "Waiting for %d prospectors to initialise", pendingProspectorCnt)

for event := range restart.Persist {
if event.Source == nil {
Expand All @@ -75,10 +76,10 @@ func (restart *ProspectorResume) Scan(files []cfg.FileConfig, persist map[string
continue
}
persist[*event.Source] = event
logp.Info("filebeat", "Registrar will re-save state for %s", *event.Source)
logp.Debug("prospector", "Registrar will re-save state for %s", *event.Source)
}

logp.Info("filebeat", "All prospectors initialised with %d states to persist", len(persist))
logp.Info("All prospectors initialised with %d states to persist", len(persist))

}

Expand Down Expand Up @@ -137,11 +138,11 @@ func (p *Prospector) Prospect(resume *ProspectorResume, output chan *FileEvent)

func (p *Prospector) scan(path string, output chan *FileEvent, resume *ProspectorResume) {

logp.Info("prospector", "scan path %s", path)
logp.Debug("prospector", "scan path %s", path)
// Evaluate the path as a wildcards/shell glob
matches, err := filepath.Glob(path)
if err != nil {
logp.Info("prospector", "glob(%s) failed: %v", path, err)
logp.Debug("prospector", "glob(%s) failed: %v", path, err)
return
}

Expand All @@ -150,7 +151,7 @@ func (p *Prospector) scan(path string, output chan *FileEvent, resume *Prospecto

// Check any matched files to see if we need to start a harvester
for _, file := range matches {
logp.Info("prospector", "Check file for harvesting: %s", file)
logp.Debug("prospector", "Check file for harvesting: %s", file)
// Stat the file, following any symlinks.
fileinfo, err := os.Stat(file)

Expand All @@ -160,12 +161,12 @@ func (p *Prospector) scan(path string, output chan *FileEvent, resume *Prospecto

// TODO(sissel): check err
if err != nil {
logp.Info("prospector", "stat(%s) failed: %s", file, err)
logp.Debug("prospector", "stat(%s) failed: %s", file, err)
continue
}

if newFile.FileInfo.IsDir() {
logp.Info("prospector", "Skipping directory: %s", file)
logp.Debug("prospector", "Skipping directory: %s", file)
continue
}

Expand All @@ -182,7 +183,7 @@ func (p *Prospector) scan(path string, output chan *FileEvent, resume *Prospecto
// - file path hasn't been seen before
// - mathe file's inode or device changed
if !is_known {
logp.Info("prospector", "Start harvesting unkown file:", file)
logp.Debug("prospector", "Start harvesting unkown file:", file)
// Create a new prospector info with the stat info for comparison
newinfo = ProspectorInfo{Fileinfo: newFile.FileInfo, Harvester: make(chan int64, 1), Last_seen: p.iteration}

Expand All @@ -201,17 +202,17 @@ func (p *Prospector) scan(path string, output chan *FileEvent, resume *Prospecto
// This is safe as the harvester, once it hits the EOF and a timeout, will stop harvesting
// Once we detect changes again we can resume another harvester again - this keeps number of go routines to a minimum
if is_resuming {
logp.Info("prospector", "Resuming harvester on a previously harvested file: %s", file)
logp.Debug("prospector", "Resuming harvester on a previously harvested file: %s", file)
harvester := &Harvester{Path: file, FileConfig: p.FileConfig, Offset: offset, FinishChan: newinfo.Harvester}
go harvester.Harvest(output)
} else {
// Old file, skip it, but push offset of file size so we start from the end if this file changes and needs picking up
logp.Info("prospector", "Skipping file (older than dead time of %v): %s", p.FileConfig.DeadtimeSpan, file)
logp.Debug("prospector", "Skipping file (older than dead time of %v): %s", p.FileConfig.DeadtimeSpan, file)
newinfo.Harvester <- newFile.FileInfo.Size()
}
} else if previous := p.isFileRenamed(file, newFile.FileInfo, missinginfo); previous != "" {
// This file was simply renamed (known inode+dev) - link the same harvester channel as the old file
logp.Info("prospector", "File rename was detected: %s -> %s", previous, file)
logp.Debug("prospector", "File rename was detected: %s -> %s", previous, file)

newinfo.Harvester = p.prospectorinfo[previous].Harvester
} else {
Expand All @@ -225,9 +226,9 @@ func (p *Prospector) scan(path string, output chan *FileEvent, resume *Prospecto

// Are we resuming a file or is this a completely new file?
if is_resuming {
logp.Info("prospector", "Resuming harvester on a previously harvested file: %s", file)
logp.Debug("prospector", "Resuming harvester on a previously harvested file: %s", file)
} else {
logp.Info("prospector", "Launching harvester on new file: %s", file)
logp.Debug("prospector", "Launching harvester on new file: %s", file)
}

// Launch the harvester
Expand All @@ -236,21 +237,21 @@ func (p *Prospector) scan(path string, output chan *FileEvent, resume *Prospecto
}
} else {

logp.Info("prospector", "Update existing file for harvesting:", file)
logp.Debug("prospector", "Update existing file for harvesting:", file)
// Update the fileinfo information used for future comparisons, and the last_seen counter
newinfo.Fileinfo = newFile.FileInfo
newinfo.Last_seen = p.iteration

if !oldFile.IsSameFile(&newFile) {
if previous := p.isFileRenamed(file, newFile.FileInfo, missinginfo); previous != "" {
// This file was renamed from another file we know - link the same harvester channel as the old file
logp.Info("prospector", "File rename was detected: %s -> %s", previous, file)
logp.Info("prospector", "Launching harvester on renamed file: %s", file)
logp.Debug("prospector", "File rename was detected: %s -> %s", previous, file)
logp.Debug("prospector", "Launching harvester on renamed file: %s", file)

newinfo.Harvester = p.prospectorinfo[previous].Harvester
} else {
// File is not the same file we saw previously, it must have rotated and is a new file
logp.Info("prospector", "Launching harvester on rotated file: %s", file)
logp.Debug("prospector", "Launching harvester on rotated file: %s", file)

// Forget about the previous harvester and let it continue on the old file - so start a new channel to use with the new harvester
newinfo.Harvester = make(chan int64, 1)
Expand All @@ -265,14 +266,14 @@ func (p *Prospector) scan(path string, output chan *FileEvent, resume *Prospecto
missinginfo[file] = oldFile.FileInfo
} else if len(newinfo.Harvester) != 0 && oldFile.FileInfo.ModTime() != newFile.FileInfo.ModTime() {
// Resume harvesting of an old file we've stopped harvesting from
logp.Info("prospector", "Resuming harvester on an old file that was just modified: %s", file)
logp.Debug("prospector", "Resuming harvester on an old file that was just modified: %s", file)

// Start a harvester on the path; an old file was just modified and it doesn't have a harvester
// The offset to continue from will be stored in the harvester channel - so take that to use and also clear the channel
harvester := &Harvester{Path: file, FileConfig: p.FileConfig, Offset: <-newinfo.Harvester, FinishChan: newinfo.Harvester}
go harvester.Harvest(output)
} else {
logp.Info("prospector", "Not harvesting, harvester probably still busy: ", file)
logp.Debug("prospector", "Not harvesting, harvester probably still busy: ", file)
}
}

Expand All @@ -296,15 +297,15 @@ func (p *Prospector) calculateResume(file string, fileinfo os.FileInfo, resume *
// File has rotated between shutdown and startup
// We return last state downstream, with a modified event source with the new file name
// And return the offset - also force harvest in case the file is old and we're about to skip it
logp.Info("prospector", "Detected rename of a previously harvested file: %s -> %s", previous, file)
logp.Debug("prospector", "Detected rename of a previously harvested file: %s -> %s", previous, file)
last_state := resume.Files[previous]
last_state.Source = &file
resume.Persist <- last_state
return last_state.Offset, true
}

if is_found {
logp.Info("prospector", "Not resuming rotated file: %s", file)
logp.Debug("prospector", "Not resuming rotated file: %s", file)
}

// New file so just start from an automatic position
Expand Down
9 changes: 5 additions & 4 deletions input/file.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package input

import (
"github.com/elastic/libbeat/logp"
"os"

"github.com/elastic/libbeat/logp"
)

type FileEvent struct {
Expand Down Expand Up @@ -49,18 +50,18 @@ func (f *FileEvent) GetState() *FileState {
// Check that the file isn't a symlink, mode is regular or file is nil
func (f *File) IsRegularFile() bool {
if f.File == nil {
logp.Info("Harvester: BUG: f arg is nil")
logp.Critical("Harvester: BUG: f arg is nil")
return false
}

info, e := f.File.Stat()
if e != nil {
logp.Info("File check fault: stat error: %s", e.Error())
logp.Err("File check fault: stat error: %s", e.Error())
return false
}

if !info.Mode().IsRegular() {
logp.Info("Harvester: not a regular file: %q %s", info.Mode(), info.Name())
logp.Warn("Harvester: not a regular file: %q %s", info.Mode(), info.Name())
return false
}
return true
Expand Down
Loading

0 comments on commit cf34826

Please sign in to comment.