Skip to content

Commit

Permalink
Improve ECS categorization field mapping in kafka module
Browse files Browse the repository at this point in the history
- event.kind
- event.type
- convert pipeline to yaml

Closes elastic#16167
  • Loading branch information
leehinman committed Feb 26, 2020
1 parent 695b167 commit 0a217c3
Show file tree
Hide file tree
Showing 11 changed files with 405 additions and 88 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Improve ECS field mappings in aws module. {issue}16154[16154] {pull}16307[16307]
- Improve ECS categorization field mappings in googlecloud module. {issue}16030[16030] {pull}16500[16500]
- Improve ECS field mappings in haproxy module. {issue}16162[16162] {pull}16529[16529]
- Improve ECS categorization field mapping in kafka module. {issue}16167[16167] {pull}16645[16645]

*Heartbeat*

Expand Down
87 changes: 0 additions & 87 deletions filebeat/module/kafka/log/ingest/pipeline.json

This file was deleted.

73 changes: 73 additions & 0 deletions filebeat/module/kafka/log/ingest/pipeline.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
description: Pipeline for parsing Kafka log messages
processors:
- grok:
field: message
trace_match: true
patterns:
- (?m)%{TIMESTAMP_ISO8601:kafka.log.timestamp}. %{LOGLEVEL:log.level} +%{JAVALOGMESSAGE:message}
\(%{JAVACLASS:kafka.log.class}\)$[ \n]*(?'kafka.log.trace.full'.*)
- grok:
field: message
pattern_definitions:
KAFKA_COMPONENT: '[^\]]*'
patterns:
- \[%{KAFKA_COMPONENT:kafka.log.component}\][,:.]? +%{JAVALOGMESSAGE:message}
on_failure:
- set:
field: kafka.log.component
value: unknown
- grok:
field: kafka.log.trace.full
ignore_missing: true
patterns:
- '%{JAVACLASS:kafka.log.trace.class}:\s*%{JAVALOGMESSAGE:kafka.log.trace.message}'
on_failure:
- remove:
field: kafka.log.trace
- remove:
field: kafka.log.trace.full
ignore_missing: true
- rename:
field: '@timestamp'
target_field: event.created
- date:
if: ctx.event.timezone == null
field: kafka.log.timestamp
target_field: '@timestamp'
formats:
- yyyy-MM-dd HH:mm:ss,SSS
on_failure:
- append:
field: error.message
value: '{{ _ingest.on_failure_message }}'
- date:
if: ctx.event.timezone != null
field: kafka.log.timestamp
target_field: '@timestamp'
formats:
- yyyy-MM-dd HH:mm:ss,SSS
timezone: '{{ event.timezone }}'
on_failure:
- append:
field: error.message
value: '{{ _ingest.on_failure_message }}'
- remove:
field: kafka.log.timestamp
- set:
field: event.kind
value: event
- script:
lang: painless
source: >-
def errorLevels = ["ERROR", "FATAL"];
if (ctx?.log?.level != null) {
if (errorLevels.contains(ctx.log.level)) {
ctx.event.type = "error";
} else {
ctx.event.type = "info";
}
}
on_failure:
- set:
field: error.log
value: '{{ _ingest.on_failure_message }}'
2 changes: 1 addition & 1 deletion filebeat/module/kafka/log/manifest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,5 @@ var:
- "{{.kafka_home}}/logs/state-change.log*"
- "{{.kafka_home}}/logs/kafka-*.log*"

ingest_pipeline: ingest/pipeline.json
ingest_pipeline: ingest/pipeline.yml
input: config/log.yml
Loading

0 comments on commit 0a217c3

Please sign in to comment.