-
Notifications
You must be signed in to change notification settings - Fork 4.9k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
This PR adds support for Kubernetes as a processor for Beats. The Kubernetes processor allows Beats to enrich events with metadata coming from the Kubernetes Pod from which the event originated. Metadata might include: * pod name * pod namespace * container name The Kubernetes processor relies on two constructs: * Indexers - used to generate metadata from Pods and store them with unique keys. Example of keys might be container IDs, IP:Port combinations, PodName. * Matchers - used to generate a lookup key from an event. Example of a matcher might be a log_path matcher that looks at the source field of an event, extract the container ID from the log path and use it as a lookup key to retrieve metadata about the Pod from which the log message originated.
- Loading branch information
Showing
169 changed files
with
184,909 additions
and
213 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,91 @@ | ||
package kubernetes | ||
|
||
import ( | ||
"fmt" | ||
"strings" | ||
|
||
"github.com/elastic/beats/libbeat/common" | ||
"github.com/elastic/beats/libbeat/logp" | ||
"github.com/elastic/beats/libbeat/processors/annotate/kubernetes" | ||
) | ||
|
||
func init() { | ||
kubernetes.Indexing.AddMatcher(LogPathMatcherName, newLogsPathMatcher) | ||
|
||
indexer := kubernetes.Indexing.GetIndexer(kubernetes.ContainerIndexerName) | ||
//Add a container indexer by default. | ||
if indexer != nil { | ||
cfg := common.NewConfig() | ||
container, err := indexer(*cfg) | ||
|
||
if err == nil { | ||
kubernetes.Indexing.AddDefaultIndexer(container) | ||
} else { | ||
logp.Err("Unable to load indexer plugin due to error: %v", err) | ||
} | ||
} else { | ||
logp.Err("Unable to get indexer plugin %s", kubernetes.ContainerIndexerName) | ||
} | ||
|
||
//Add a log path matcher which can extract container ID from the "source" field. | ||
matcher := kubernetes.Indexing.GetMatcher(LogPathMatcherName) | ||
|
||
if matcher != nil { | ||
cfg := common.NewConfig() | ||
logsPathMatcher, err := matcher(*cfg) | ||
if err == nil { | ||
kubernetes.Indexing.AddDefaultMatcher(logsPathMatcher) | ||
} else { | ||
logp.Err("Unable to load matcher plugin due to error: %v", err) | ||
} | ||
} else { | ||
logp.Err("Unable to get matcher plugin %s", LogPathMatcherName) | ||
} | ||
|
||
} | ||
|
||
const LogPathMatcherName = "logs_path" | ||
|
||
type LogPathMatcher struct { | ||
LogsPath string | ||
} | ||
|
||
func newLogsPathMatcher(cfg common.Config) (kubernetes.Matcher, error) { | ||
config := struct { | ||
LogsPath string `config:"logs_path"` | ||
}{ | ||
LogsPath: "/var/lib/docker/containers/", | ||
} | ||
|
||
err := cfg.Unpack(&config) | ||
if err != nil || config.LogsPath == "" { | ||
return nil, fmt.Errorf("fail to unpack the `logs_path` configuration: %s", err) | ||
} | ||
|
||
logPath := config.LogsPath | ||
if logPath[len(logPath)-1:] != "/" { | ||
logPath = logPath + "/" | ||
} | ||
|
||
return &LogPathMatcher{LogsPath: logPath}, nil | ||
} | ||
|
||
func (f *LogPathMatcher) MetadataIndex(event common.MapStr) string { | ||
|
||
if value, ok := event["source"]; ok { | ||
source := value.(string) | ||
logp.Debug("kubernetes", "Incoming source value: ", source) | ||
cid := "" | ||
if strings.Contains(source, f.LogsPath) { | ||
//Docker container is 64 chars in length | ||
cid = source[len(f.LogsPath) : len(f.LogsPath)+64] | ||
} | ||
logp.Debug("kubernetes", "Using container id: ", cid) | ||
|
||
if cid != "" { | ||
return cid | ||
} | ||
} | ||
|
||
return "" | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,31 @@ | ||
package kubernetes | ||
|
||
import ( | ||
"fmt" | ||
"github.com/elastic/beats/libbeat/common" | ||
"github.com/stretchr/testify/assert" | ||
"testing" | ||
) | ||
|
||
func TestLogsPathMatcher(t *testing.T) { | ||
var testConfig = common.NewConfig() | ||
|
||
logMatcher, err := newLogsPathMatcher(*testConfig) | ||
assert.Nil(t, err) | ||
|
||
cid := "0069869de9adf97f574c62029aeba65d1ecd85a2a112e87fbc28afe4dec2b843" | ||
logPath := fmt.Sprintf("/var/lib/docker/containers/%s/%s-json.log", cid, cid) | ||
|
||
input := common.MapStr{ | ||
"source": "/var/log/messages", | ||
} | ||
|
||
output := logMatcher.MetadataIndex(input) | ||
assert.Equal(t, output, "") | ||
|
||
input["source"] = logPath | ||
output = logMatcher.MetadataIndex(input) | ||
|
||
assert.Equal(t, output, cid) | ||
|
||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.