Skip to content
This repository has been archived by the owner on May 25, 2022. It is now read-only.

Commit

Permalink
fix data loss between logrotate
Browse files Browse the repository at this point in the history
  • Loading branch information
rockb1017 committed May 15, 2021
1 parent 8b85a69 commit 0f2bb1e
Show file tree
Hide file tree
Showing 5 changed files with 131 additions and 158 deletions.
2 changes: 1 addition & 1 deletion operator/builtin/input/file/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,10 +151,10 @@ func (c InputConfig) Build(context operator.BuildContext) ([]operator.Operator,
firstCheck: true,
cancel: func() {},
knownFiles: make([]*Reader, 0, 10),
tailingReaders: map[string][]*Reader{},
fingerprintSize: int(c.FingerprintSize),
MaxLogSize: int(c.MaxLogSize),
MaxConcurrentFiles: c.MaxConcurrentFiles,
SeenPaths: make(map[string]struct{}, 100),
}

return []operator.Operator{op}, nil
Expand Down
264 changes: 117 additions & 147 deletions operator/builtin/input/file/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,13 @@ type InputOperator struct {
SplitFunc bufio.SplitFunc
MaxLogSize int
MaxConcurrentFiles int
SeenPaths map[string]struct{}

persister operator.Persister

knownFiles []*Reader
queuedMatches []string
tailingReaders map[string][]*Reader
readers []*Reader

startAtBeginning bool

Expand Down Expand Up @@ -87,6 +88,12 @@ func (f *InputOperator) Stop() error {
f.wg.Wait()
f.knownFiles = nil
f.cancel = nil
for _, readers := range f.tailingReaders {
for _, reader := range readers {
f.closeFile(reader.file, reader.Path)
}
}
f.tailingReaders = nil
return nil
}

Expand All @@ -111,6 +118,7 @@ func (f *InputOperator) startPoller(ctx context.Context) {
}()
}

const RotatedFileTrackingLimit = 10240
// poll checks all the watched paths for new entries
func (f *InputOperator) poll(ctx context.Context) {
var matches []string
Expand All @@ -120,12 +128,6 @@ func (f *InputOperator) poll(ctx context.Context) {
if len(f.queuedMatches) > 0 {
matches, f.queuedMatches = f.queuedMatches, make([]string, 0)
} else {
// Increment the generation on all known readers
// This is done here because the next generation is about to start
for i := 0; i < len(f.knownFiles); i++ {
f.knownFiles[i].generation++
}

// Get the list of paths on disk
matches = getMatches(f.Include, f.Exclude)
if f.firstCheck && len(matches) == 0 {
Expand All @@ -135,47 +137,67 @@ func (f *InputOperator) poll(ctx context.Context) {
}
}
}

// Open the files first to minimize the time between listing and opening
files := make([]*os.File, 0, len(matches))
for _, path := range matches {
if _, ok := f.SeenPaths[path]; !ok {
if f.startAtBeginning {
f.Infow("Started watching file", "path", path)
} else {
f.Infow("Started watching file from end. To read preexisting logs, configure the argument 'start_at' to 'beginning'", "path", path)
}
f.SeenPaths[path] = struct{}{}
}
file, err := os.Open(path)
toTail, file, fp, err := f.isNewFileToTail(path)
if err != nil {
f.Errorw("Failed to open file", zap.Error(err))
continue
}
files = append(files, file)
}
if !toTail {
continue
}

readers := f.makeReaders(files)
if _, ok := f.tailingReaders[path]; !ok { // If path doesn't exist => new file to tail
reader, err := f.NewReader(path, file, fp)
if err != nil {
f.Errorw("Failed to make reader for " + path, zap.Error(err))
f.closeFile(reader.file, reader.Path)
continue
}
if f.firstCheck {
if f.startAtBeginning {
f.Infow("Started watching file", "path", path)
} else {
f.Infow("Started watching file from end. To read preexisting logs, configure the argument 'start_at' to 'beginning'", "path", path)
}
if err := reader.InitializeOffset(f.startAtBeginning); err != nil {
f.Errorw("Failed to initialize offset for " + path , zap.Error(err))
}
} else {
f.Infow("Started watching file", "path", path)
}
f.tailingReaders[path] = []*Reader{reader}
} else { // If path exists
reader, err := f.NewReader(path, file, fp)
if err != nil {
f.Errorw("Failed to make reader for " + path, zap.Error(err))
f.closeFile(file, path)
continue
}
f.Infow("Log rotation detected. Started watching file", "path", path)
f.tailingReaders[path] = append(f.tailingReaders[path], reader)
// Limit tracking the iteration of rotated files
if len(f.tailingReaders[path]) > RotatedFileTrackingLimit {
var firstReader *Reader
firstReader, f.tailingReaders[path] = f.tailingReaders[path][0], f.tailingReaders[path][1:]
f.closeFile(firstReader.file, firstReader.Path)
}
}
}
f.firstCheck = false

var wg sync.WaitGroup
for _, reader := range readers {
wg.Add(1)
go func(r *Reader) {
defer wg.Done()
r.ReadToEnd(ctx)
}(reader)
for _, readers := range f.tailingReaders {
for _, reader := range readers {
wg.Add(1)
go func(r *Reader) {
defer wg.Done()
r.ReadToEnd(ctx)
}(reader)
}
}

// Wait until all the reader goroutines are finished
wg.Wait()

// Close all files
for _, file := range files {
file.Close()
}

f.saveCurrent(readers)
f.syncLastPollFiles(ctx)
}

Expand Down Expand Up @@ -205,113 +227,38 @@ func getMatches(includes, excludes []string) []string {
return all
}

// makeReaders takes a list of paths, then creates readers from each of those paths,
// discarding any that have a duplicate fingerprint to other files that have already
// been read this polling interval
func (f *InputOperator) makeReaders(files []*os.File) []*Reader {
// Get fingerprints for each file
fps := make([]*Fingerprint, 0, len(files))
for _, file := range files {
fp, err := f.NewFingerprint(file)
if err != nil {
f.Errorw("Failed creating fingerprint", zap.Error(err))
continue
}
fps = append(fps, fp)
}

// Make a copy of the files so we don't modify the original
filesCopy := make([]*os.File, len(files))
copy(filesCopy, files)

// Exclude any empty fingerprints or duplicate fingerprints to avoid doubling up on copy-truncate files
OUTER:
for i := 0; i < len(fps); {
fp := fps[i]
if len(fp.FirstBytes) == 0 {
// Empty file, don't read it until we can compare its fingerprint
fps = append(fps[:i], fps[i+1:]...)
filesCopy = append(filesCopy[:i], filesCopy[i+1:]...)
}

for j := 0; j < len(fps); j++ {
if i == j {
// Skip checking itself
continue
}

fp2 := fps[j]
if fp.StartsWith(fp2) || fp2.StartsWith(fp) {
// Exclude
fps = append(fps[:i], fps[i+1:]...)
filesCopy = append(filesCopy[:i], filesCopy[i+1:]...)
continue OUTER
}
}
i++
}

readers := make([]*Reader, 0, len(fps))
for i := 0; i < len(fps); i++ {
reader, err := f.newReader(filesCopy[i], fps[i], f.firstCheck)
if err != nil {
f.Errorw("Failed to create reader", zap.Error(err))
continue
}
readers = append(readers, reader)
// isNewFileToTail compares fingerprints with already tailing files to see if it is a new file or not
func (f *InputOperator) isNewFileToTail(path string) (bool, *os.File, *Fingerprint, error) {
file, err := os.Open(path)
if err != nil {
f.Errorw("Failed to open file", zap.Error(err))
return false, nil, nil, err
}

return readers
}

// saveCurrent adds the readers from this polling interval to this list of
// known files, then increments the generation of all tracked old readers
// before clearing out readers that have existed for 3 generations.
func (f *InputOperator) saveCurrent(readers []*Reader) {
// Add readers from the current, completed poll interval to the list of known files
f.knownFiles = append(f.knownFiles, readers...)

// Clear out old readers. They are sorted such that they are oldest first,
// so we can just find the first reader whose generation is less than our
// max, and keep every reader after that
for i := 0; i < len(f.knownFiles); i++ {
reader := f.knownFiles[i]
if reader.generation <= 3 {
f.knownFiles = f.knownFiles[i:]
break
}
fp, err := f.NewFingerprint(file)
if err != nil{
f.Errorw("Failed to make FingerPrint", zap.Error(err))
f.closeFile(file, path)
return false, nil, nil, err
}
}

func (f *InputOperator) newReader(file *os.File, fp *Fingerprint, firstCheck bool) (*Reader, error) {
// Check if the new path has the same fingerprint as an old path
if oldReader, ok := f.findFingerprintMatch(fp); ok {
newReader, err := oldReader.Copy(file)
if err != nil {
return nil, err
}
newReader.Path = file.Name()
return newReader, nil
if len(fp.FirstBytes) == 0 {
f.closeFile(file, path)
return false, nil, nil, nil
}

// If we don't match any previously known files, create a new reader from scratch
newReader, err := f.NewReader(file.Name(), file, fp)
if err != nil {
return nil, err
}
startAtBeginning := !firstCheck || f.startAtBeginning
if err := newReader.InitializeOffset(startAtBeginning); err != nil {
return nil, fmt.Errorf("initialize offset: %s", err)
_, exist := f.findFingerprintMatch(fp)
if exist { // Already tailing
f.closeFile(file, path)
return false, nil, nil, nil
}
return newReader, nil
return true, file, fp, nil
}

func (f *InputOperator) findFingerprintMatch(fp *Fingerprint) (*Reader, bool) {
// Iterate backwards to match newest first
for i := len(f.knownFiles) - 1; i >= 0; i-- {
oldReader := f.knownFiles[i]
if fp.StartsWith(oldReader.Fingerprint) {
return oldReader, true
for _, readers := range f.tailingReaders {
for _, reader := range readers {
if fp.StartsWith(reader.Fingerprint) {
return reader, true
}
}
}
return nil, false
Expand All @@ -324,15 +271,16 @@ func (f *InputOperator) syncLastPollFiles(ctx context.Context) {
var buf bytes.Buffer
enc := json.NewEncoder(&buf)

// Encode the number of known files
if err := enc.Encode(len(f.knownFiles)); err != nil {
// Encode the number of known files.
if err := enc.Encode(len(f.tailingReaders)); err != nil {
f.Errorw("Failed to encode known files", zap.Error(err))
return
}

// Encode each known file
for _, fileReader := range f.knownFiles {
if err := enc.Encode(fileReader); err != nil {
// Encode each known file. For files that has same path, encode only latest one.
for _, readers := range f.tailingReaders {
lastReader := readers[len(readers)-1]
if err := enc.Encode(lastReader); err != nil {
f.Errorw("Failed to encode known files", zap.Error(err))
}
}
Expand All @@ -349,31 +297,53 @@ func (f *InputOperator) loadLastPollFiles(ctx context.Context) error {
return err
}

f.tailingReaders = map[string][]*Reader{}
if encoded == nil {
f.knownFiles = make([]*Reader, 0, 10)
return nil
}

dec := json.NewDecoder(bytes.NewReader(encoded))

// Decode the number of entries
var knownFileCount int
if err := dec.Decode(&knownFileCount); err != nil {
return fmt.Errorf("decoding file count: %w", err)
}

// Decode each of the known files
f.knownFiles = make([]*Reader, 0, knownFileCount)
for i := 0; i < knownFileCount; i++ {
newReader, err := f.NewReader("", nil, nil)
decodedReader, err := f.NewReader("", nil, nil)
if err != nil {
return err
}
if err = dec.Decode(newReader); err != nil {
if err = dec.Decode(decodedReader); err != nil {
return err
}
f.knownFiles = append(f.knownFiles, newReader)
path := decodedReader.Path
file, err := os.Open(path)
if err != nil {
f.Errorw("Failed to open file while recovering checkpoints", zap.Error(err))
f.tailingReaders[path] = []*Reader{decodedReader}
continue
}

restoredReader, err := f.NewReader(path, file, decodedReader.Fingerprint)
if err != nil{
f.Errorw("Failed to restore a Reader", zap.Error(err), "path", path)
continue
}
restoredReader.Offset = decodedReader.Offset
f.tailingReaders[path] = []*Reader{restoredReader}
}

for _, readers := range f.tailingReaders {
f.Infof("Load Successful: %+v\n", readers)
}

return nil
}

func (f *InputOperator) closeFile(file *os.File, path string) {
err := file.Close()
if err != nil {
f.Warnw("Error closing a file", "file", path, "err", err)
}
}
Loading

0 comments on commit 0f2bb1e

Please sign in to comment.