Skip to content

Commit

Permalink
Improve ECS categorization field mapping in kafka module (#16645)
Browse files Browse the repository at this point in the history
- event.kind
- event.type
- convert pipeline to yaml

Closes #16167
  • Loading branch information
leehinman authored Mar 2, 2020
1 parent 07b03a7 commit e7f9335
Show file tree
Hide file tree
Showing 11 changed files with 405 additions and 88 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Improve the decode_cef processor by reducing the number of memory allocations. {pull}16587[16587]
- Add `cloudfoundry` input to send events from Cloud Foundry. {pull}16586[16586]
- Improve ECS categorization field mappings in iis module. {issue}16165[16165] {pull}16618[16618]
- Improve ECS categorization field mapping in kafka module. {issue}16167[16167] {pull}16645[16645]

*Heartbeat*

Expand Down
87 changes: 0 additions & 87 deletions filebeat/module/kafka/log/ingest/pipeline.json

This file was deleted.

73 changes: 73 additions & 0 deletions filebeat/module/kafka/log/ingest/pipeline.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
description: Pipeline for parsing Kafka log messages
processors:
- grok:
field: message
trace_match: true
patterns:
- (?m)%{TIMESTAMP_ISO8601:kafka.log.timestamp}. %{LOGLEVEL:log.level} +%{JAVALOGMESSAGE:message}
\(%{JAVACLASS:kafka.log.class}\)$[ \n]*(?'kafka.log.trace.full'.*)
- grok:
field: message
pattern_definitions:
KAFKA_COMPONENT: '[^\]]*'
patterns:
- \[%{KAFKA_COMPONENT:kafka.log.component}\][,:.]? +%{JAVALOGMESSAGE:message}
on_failure:
- set:
field: kafka.log.component
value: unknown
- grok:
field: kafka.log.trace.full
ignore_missing: true
patterns:
- '%{JAVACLASS:kafka.log.trace.class}:\s*%{JAVALOGMESSAGE:kafka.log.trace.message}'
on_failure:
- remove:
field: kafka.log.trace
- remove:
field: kafka.log.trace.full
ignore_missing: true
- rename:
field: '@timestamp'
target_field: event.created
- date:
if: ctx.event.timezone == null
field: kafka.log.timestamp
target_field: '@timestamp'
formats:
- yyyy-MM-dd HH:mm:ss,SSS
on_failure:
- append:
field: error.message
value: '{{ _ingest.on_failure_message }}'
- date:
if: ctx.event.timezone != null
field: kafka.log.timestamp
target_field: '@timestamp'
formats:
- yyyy-MM-dd HH:mm:ss,SSS
timezone: '{{ event.timezone }}'
on_failure:
- append:
field: error.message
value: '{{ _ingest.on_failure_message }}'
- remove:
field: kafka.log.timestamp
- set:
field: event.kind
value: event
- script:
lang: painless
source: >-
def errorLevels = ["ERROR", "FATAL"];
if (ctx?.log?.level != null) {
if (errorLevels.contains(ctx.log.level)) {
ctx.event.type = "error";
} else {
ctx.event.type = "info";
}
}
on_failure:
- set:
field: error.log
value: '{{ _ingest.on_failure_message }}'
2 changes: 1 addition & 1 deletion filebeat/module/kafka/log/manifest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,5 @@ var:
- "{{.kafka_home}}/logs/state-change.log*"
- "{{.kafka_home}}/logs/kafka-*.log*"

ingest_pipeline: ingest/pipeline.json
ingest_pipeline: ingest/pipeline.yml
input: config/log.yml
Loading

0 comments on commit e7f9335

Please sign in to comment.