Skip to content

Commit

Permalink
Add initial support for configurable file identity tracking (elastic#…
Browse files Browse the repository at this point in the history
…18748)

## What does this PR do?

This PR adds a new option to the `log` input of Filebeat named `file_identity`. The option lets users configure file identity for state tracking.

### Available strategies
1. `native` (default): Filebeat identifies files based on their inode and device id.
2. `path`: Files are considered different if they have different paths.
3. `inode_marker`: A special marker file and the inode is used to tell apart files. It is not supported on Windows.

State IDs previously were not saved to the registry file. Now, these are persisted on the disk.

### Architecture

I introduced a new interface: `file.StateIdentifier`. The responsibility of `StateIdentifier` is to generate an identifier for a `file.State` based on the configuration. If someone wants to implement their own `StateIdentifier` method, all they need is to create a struct which satisfies this interface.

```golang
// StateIdentifier generates an ID for a State.
type StateIdentifier interface {
	// GenerateID generates and returns the ID of the state
	GenerateID(State) (stateId, identifierType string)
}
```

As every state has an ID, Filebeat just compares the IDs of the two states to decide if they belong to the same file or not.

The scope of the PR does not include strategies which include fingerprinting the contents of the file.
  • Loading branch information
kvch authored Jul 14, 2020
1 parent a930fee commit 8c5c5d4
Show file tree
Hide file tree
Showing 26 changed files with 856 additions and 186 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Add experimental dataset sonicwall/firewall for Sonicwall Firewalls logs {pull}19713[19713]
- Add experimental dataset squid/log for Squid Proxy Server logs {pull}19713[19713]
- Add experimental dataset zscaler/zia for Zscaler Internet Access logs {pull}19713[19713]
- Add initial support for configurable file identity tracking. {pull}18748[18748]

*Heartbeat*

Expand Down
4 changes: 4 additions & 0 deletions filebeat/_meta/config/filebeat.inputs.reference.yml.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ filebeat.inputs:
# are matching any regular expression from the list. By default, no files are dropped.
#exclude_files: ['.gz$']

# Method to determine if two files are the same or not. By default
# the Beat considers two files the same if their inode and device id are the same.
#file_identity.native: ~

# Optional additional fields. These fields can be freely picked
# to add additional information to the crawled log files for filtering
#fields:
Expand Down
45 changes: 45 additions & 0 deletions filebeat/docs/inputs/input-common-file-options.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,10 @@ file is renamed or moved in such a way that it's no longer matched by the file
patterns specified for the , the file will not be picked up again.
{beatname_uc} will not finish reading the file.

Do not use this option when `path` based `file_identity` is configured. It does
not make sense to enable the option, as Filebeat cannot detect renames using
path names as unique identifiers.

WINDOWS: If your Windows log rotation system shows errors because it can't
rotate the files, you should enable this option.

Expand Down Expand Up @@ -397,3 +401,44 @@ file that hasn't been harvested for a longer period of time.
This configuration option applies per input. You can use this option to
indirectly set higher priorities on certain inputs by assigning a higher
limit of harvesters.

[float]
===== `file_identity`

Different `file_identity` methods can be configured to suit the
environment where you are collecting log messages.


*`native`*:: The default behaviour of {beatname_uc} is to differentiate
between files using their inodes and device ids.

[source,yaml]
----
file_identity.native: ~
----

*`path`*:: To identify files based on their paths use this strategy.

WARNING: Only use this strategy if your log files are rotated to a folder
outside of the scope of your input or not at all. Otherwise you end up
with duplicated events.

WARNING: This strategy does not support renaming files.
If an input file is renamed, {beatname_uc} will read it again if the new path
matches the settings of the input.

[source,yaml]
----
file_identity.path: ~
----

*`inode_marker`*:: If the device id changes from time to time, you must use
this method to distinguish files. This option is not supported on Windows.

Set the location of the marker file the following way:

[source,yaml]
----
file_identity.inode_marker.path: /logs/.filebeat-marker
----

53 changes: 53 additions & 0 deletions filebeat/docs/inputs/input-log.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,55 @@ multiple input sections:
IMPORTANT: Make sure a file is not defined more than once across all inputs
because this can lead to unexpected behaviour.

[[file-identity]]
==== Reading files on network shares and cloud providers

:WARNING: Filebeat does not support reading from network shares and cloud providers.

However, one of the limitations of these data sources can be mitigated
if you configure Filebeat adequately.

By default, {beatname_uc} identifies files based on their inodes and
device IDs. However, on network shares and cloud providers these
values might change during the lifetime of the file. If this happens
{beatname_uc} thinks that file is new and resends the whole content
of the file. To solve this problem you can configure `file_identity` option. Possible
values besides the default `inode_deviceid` are `path` and `inode_marker`.

Selecting `path` instructs {beatname_uc} to identify files based on their
paths. This is a quick way to aviod rereading files if inode and device ids
might change. However, keep in mind if the files are rotated (renamed), they
will be reread and resubmitted.

The option `inode_marker` can be used if the inodes stay the same even if
the device id is changed. You should choose this method if your files are
rotated instead of `path` if possible. You have to configure a marker file
readable by {beatname_uc} and set the path in the option `path` of `inode_marker`.

The content of this file must be unique to the device. You can put the
UUID of the device or mountpoint where the input is stored. The following
example oneliner generates a hidden marker file for the selected mountpoint `/logs`:
Please note that you should not use this option on Windows as file identifiers might be
more volatile.

["source","sh",subs="attributes"]
----
$ lsblk -o MOUNTPOINT,UUD | grep /logs | awk '{print $2}' >> /logs/.filebeat-marker
----

To set the generated file as a marker for `file_identity` you should configure
the input the following way:

["source","yaml",subs="attributes"]
----
{beatname_lc}.inputs:
- type: log
paths:
- /logs/*.log
file_identity.inode_marker.path: /logs/.filebeat-marker
----


[[rotating-logs]]
==== Reading from rotating logs

Expand All @@ -66,6 +115,10 @@ a pattern that matches the file you want to harvest and all of its rotated
files. Also make sure your log rotation strategy prevents lost or duplicate
messages. For more information, see <<file-log-rotation>>.

Furthermore, to avoid duplicate of rotated log messages, do not use the
`path` method for `file_identity`. Or exclude the rotated files with `exclude_files`
option.

[id="{beatname_lc}-input-{type}-options"]
==== Configuration options

Expand Down
4 changes: 4 additions & 0 deletions filebeat/filebeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,10 @@ filebeat.inputs:
# are matching any regular expression from the list. By default, no files are dropped.
#exclude_files: ['.gz$']

# Method to determine if two files are the same or not. By default
# the Beat considers two files the same if their inode and device id are the same.
#file_identity.native: ~

# Optional additional fields. These fields can be freely picked
# to add additional information to the crawled log files for filtering
#fields:
Expand Down
6 changes: 1 addition & 5 deletions filebeat/input/file/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,8 @@ type File struct {
State *State
}

// Checks if the two files are the same.
func (f *File) IsSameFile(f2 *File) bool {
return os.SameFile(f.FileInfo, f2.FileInfo)
}

// IsSameFile checks if the given File path corresponds with the FileInfo given
// It is used to check if the file has been renamed.
func IsSameFile(path string, info os.FileInfo) bool {
fileInfo, err := os.Stat(path)

Expand Down
121 changes: 121 additions & 0 deletions filebeat/input/file/identifier.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package file

import (
"fmt"
"strconv"
"strings"

"github.com/mitchellh/hashstructure"

"github.com/elastic/beats/v7/libbeat/common"
)

const (
nativeName = "native"
pathName = "path"
inodeMarkerName = "inode_marker"

DefaultIdentifierName = nativeName
identitySep = "::"
)

var (
identifierFactories = map[string]IdentifierFactory{
nativeName: newINodeDeviceIdentifier,
pathName: newPathIdentifier,
inodeMarkerName: newINodeMarkerIdentifier,
}
)

type IdentifierFactory func(*common.Config) (StateIdentifier, error)

// StateIdentifier generates an ID for a State.
type StateIdentifier interface {
// GenerateID generates and returns the ID of the state and its type
GenerateID(State) (id, identifierType string)
}

// NewStateIdentifier creates a new state identifier for a log input.
func NewStateIdentifier(ns *common.ConfigNamespace) (StateIdentifier, error) {
if ns == nil {
return newINodeDeviceIdentifier(nil)
}

identifierType := ns.Name()
f, ok := identifierFactories[identifierType]
if !ok {
return nil, fmt.Errorf("no such file_identity generator: %s", identifierType)
}

return f(ns.Config())
}

type inodeDeviceIdentifier struct {
name string
}

func newINodeDeviceIdentifier(_ *common.Config) (StateIdentifier, error) {
return &inodeDeviceIdentifier{
name: nativeName,
}, nil
}

func (i *inodeDeviceIdentifier) GenerateID(s State) (id, identifierType string) {
stateID := i.name + identitySep + s.FileStateOS.String()
return genIDWithHash(s.Meta, stateID), i.name
}

type pathIdentifier struct {
name string
}

func newPathIdentifier(_ *common.Config) (StateIdentifier, error) {
return &pathIdentifier{
name: pathName,
}, nil
}

func (p *pathIdentifier) GenerateID(s State) (id, identifierType string) {
stateID := p.name + identitySep + s.Source
return genIDWithHash(s.Meta, stateID), p.name
}

func genIDWithHash(meta map[string]string, fileID string) string {
if len(meta) == 0 {
return fileID
}

hashValue, _ := hashstructure.Hash(meta, nil)
var hashBuf [17]byte
hash := strconv.AppendUint(hashBuf[:0], hashValue, 16)
hash = append(hash, '-')

var b strings.Builder
b.Grow(len(hash) + len(fileID))
b.Write(hash)
b.WriteString(fileID)

return b.String()
}

// mockIdentifier is used for testing
type MockIdentifier struct{}

func (m *MockIdentifier) GenerateID(s State) (string, string) { return s.Id, "mock" }
98 changes: 98 additions & 0 deletions filebeat/input/file/identifier_inode_deviceid.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

// +build !windows

package file

import (
"fmt"
"io/ioutil"
"os"
"path/filepath"
"time"

"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/logp"
)

type inodeMarkerIdentifier struct {
log *logp.Logger
name string
markerPath string

markerFileLastModifitaion time.Time
markerTxt string
}

func newINodeMarkerIdentifier(cfg *common.Config) (StateIdentifier, error) {
var config struct {
MarkerPath string `config:"path" validate:"required"`
}
err := cfg.Unpack(&config)
if err != nil {
return nil, fmt.Errorf("error while reading configuration of INode + marker file configuration: %v", err)
}

fi, err := os.Stat(config.MarkerPath)
if err != nil {
return nil, fmt.Errorf("error while opening marker file at %s: %v", config.MarkerPath, err)
}
markerContent, err := ioutil.ReadFile(config.MarkerPath)
if err != nil {
return nil, fmt.Errorf("error while reading marker file at %s: %v", config.MarkerPath, err)
}
return &inodeMarkerIdentifier{
log: logp.NewLogger("inode_marker_identifier_" + filepath.Base(config.MarkerPath)),
name: inodeMarkerName,
markerPath: config.MarkerPath,
markerFileLastModifitaion: fi.ModTime(),
markerTxt: string(markerContent),
}, nil
}

func (i *inodeMarkerIdentifier) markerContents() string {
f, err := os.Open(i.markerPath)
if err != nil {
i.log.Errorf("Failed to open marker file %s: %v", i.markerPath, err)
return ""
}
defer f.Close()

fi, err := f.Stat()
if err != nil {
i.log.Errorf("Failed to fetch file information for %s: %v", i.markerPath, err)
return ""
}
if i.markerFileLastModifitaion.Before(fi.ModTime()) {
contents, err := ioutil.ReadFile(i.markerPath)
if err != nil {
i.log.Errorf("Error while reading contents of marker file: %v", err)
return ""
}
i.markerTxt = string(contents)
}

return i.markerTxt
}

func (i *inodeMarkerIdentifier) GenerateID(s State) (id, identifierType string) {
m := i.markerContents()

stateID := fmt.Sprintf("%s%s%s-%s", i.name, identitySep, s.FileStateOS.InodeString(), m)
return genIDWithHash(s.Meta, stateID), i.name
}
Loading

0 comments on commit 8c5c5d4

Please sign in to comment.