Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unable to collect metrics from kafka to influx #5361

Closed
tcho16 opened this issue Feb 1, 2019 · 4 comments
Closed

Unable to collect metrics from kafka to influx #5361

tcho16 opened this issue Feb 1, 2019 · 4 comments

Comments

@tcho16
Copy link

tcho16 commented Feb 1, 2019

I am pushing data to my kafka setup. I can see the data is being pushed successfully to kafka as when I call the kafka-console-consumer script, I can see the messages. Now my setup is like this:
Kafka -> Telegraf -> Influx
There seems to be a problem in making telegraf get the data. All of these tools are installed locally on my machine hence using localhost with default settings (no SSL etc).

Relevant telegraf.conf:

Global tags can be specified here in key="value" format.

[global_tags]

Configuration for telegraf agent

[agent]
  interval = "1s"
  round_interval = true
  metric_batch_size = 1000
  metric_buffer_limit = 10000
  collection_jitter = "0s"
  flush_interval = "10s"
  flush_jitter = "0s"
  precision = ""
  debug = true
  quiet = false
  logfile = ""
  hostname = "localhost"
  omit_hostname = false


###############################################################################
#                            OUTPUT PLUGINS                                   #
###############################################################################
# Configuration for sending metrics to InfluxDB
[[outputs.influxdb]]
  ## The full HTTP or UDP URL for your InfluxDB instance.
  ##
  ## Multiple URLs can be specified for a single cluster, only ONE of the
  ## urls will be written to each interval.
  # urls = ["unix:///var/run/influxdb.sock"]
  # urls = ["udp://127.0.0.1:8089"]
   urls = ["http://127.0.0.1:8086"]

  ## The target database for metrics; will be created as needed.
   database = "telegraf"

  ## If true, no CREATE DATABASE queries will be sent.  Set to true when using
  ## Telegraf with a user without permissions to create databases or when the
  ## database already exists.
  # skip_database_creation = false

  ## Name of existing retention policy to write to.  Empty string writes to
  ## the default retention policy.  Only takes effect when using HTTP.
  # retention_policy = ""

  ## Write consistency (clusters only), can be: "any", "one", "quorum", "all".
  ## Only takes effect when using HTTP.
  # write_consistency = "any"

  ## Timeout for HTTP messages.
  # timeout = "5s"

  ## HTTP Basic Auth
  # username = "telegraf"
  # password = "metricsmetricsmetricsmetrics"

  ## HTTP User-Agent
  # user_agent = "telegraf"

  ## UDP payload size is the maximum packet size to send.
  # udp_payload = "512B"

  ## Optional TLS Config for use on HTTP connections.
  # tls_ca = "/etc/telegraf/ca.pem"
  # tls_cert = "/etc/telegraf/cert.pem"
  # tls_key = "/etc/telegraf/key.pem"
  ## Use TLS but skip chain & host verification
  # insecure_skip_verify = false

  ## HTTP Proxy override, if unset values the standard proxy environment
  ## variables are consulted to determine which proxy, if any, should be used.
  # http_proxy = "http://corporate.proxy:3128"

  ## Additional HTTP headers
  # http_headers = {"X-Special-Header" = "Special-Value"}

  ## HTTP Content-Encoding for write request body, can be set to "gzip" to
  ## compress body or "identity" to apply no encoding.
  # content_encoding = "identity"

  ## When true, Telegraf will output unsigned integers as unsigned values,
  ## i.e.: "42u".  You will need a version of InfluxDB supporting unsigned
  ## integer values.  Enabling this option will result in field type errors if
  ## existing data has been written.
  # influx_uint_support = false

###############################################################################
#                            INPUT PLUGINS                                    #
###############################################################################

[[inputs.kafka_consumer]]
  ## kafka servers
  brokers = ["localhost:9092"]
  ## topic(s) to consume
  topics = ["com.sample"]
  ## Add topic as tag if topic_tag is not empty
  # topic_tag = ""

  ## Optional Client id
  # client_id = "Telegraf"

  ## Set the minimal supported Kafka version.  Setting this enables the use of new
  ## Kafka features and APIs.  Of particular interest, lz4 compression
  ## requires at least version 0.10.0.0.
  ##   ex: version = "1.1.0"
  # version = ""

  ## Optional TLS Config
  # tls_ca = "/etc/telegraf/ca.pem"
  # tls_cert = "/etc/telegraf/cert.pem"
  # tls_key = "/etc/telegraf/key.pem"
  ## Use TLS but skip chain & host verification
  # insecure_skip_verify = false

  ## Optional SASL Config
  # sasl_username = "kafka"
  # sasl_password = "secret"

  ## the name of the consumer group
  consumer_group = "com.sample"
  ## Offset (must be either "oldest" or "newest")
  offset = "oldest"

  ## Maximum length of a message to consume, in bytes (default 0/unlimited);
  ## larger messages are dropped
  max_message_len = 1000000

  ## Maximum messages to read from the broker that have not been written by an
  ## output.  For best throughput set based on the number of metrics within
  ## each message and the size of the output's metric_batch_size.
  ##
  ## For example, if each message from the queue contains 10 metrics and the
  ## output metric_batch_size is 1000, setting this to 100 will ensure that a
  ## full batch is collected and the write is triggered immediately without
  ## waiting until the next flush_interval.
  # max_undelivered_messages = 1000

  ## Data format to consume.
  ## Each data format has its own unique set of configuration options, read
  ## more about them here:
  ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
  data_format = "influx"

System info:

[Include Telegraf version, operating system name, and other relevant details]
Telegrad 1.9.3
Windows 10
Kafka 2.11-2.1.0
Influx 1.7.3

Steps to reproduce:

Start zookeeper + kafka
Start influx
Upon starting up telegraf with the above config, the following error appears:
E! [inputs.kafka_consumer]: Error in plugin: metric parse error: expected field at offset 54: "jvm_memory_used,area=heap,host=localhost,id=PS\\ Old\\ Gen,metric_type=gauge,statistic=value value=22579376 1549026687000000000\n"

Expected behavior:

Connect successfully

Actual behavior:

Exception being thrown

Additional info:

[Include gist of relevant config, logs, etc.]

@danielnelson
Copy link
Contributor

Merging with #4426, it is a bug in the "influx" parser. If possible you could insert this item with one less backslash:

- jvm_memory_used,area=heap,host=localhost,id=PS\\ Old\\ Gen,metric_type=gauge,statistic=value value=22579376 1549026687000000000
+ jvm_memory_used,area=heap,host=localhost,id=PS\ Old\ Gen,metric_type=gauge,statistic=value value=22579376 1549026687000000000

@tcho16
Copy link
Author

tcho16 commented Feb 5, 2019

I have a regex like the following but it still does not pick it up

[[processors.regex]]
namepass = [“jvm_memory_max”]

[[processors.regex.tags]]
key = “id”
pattern = “(\\{2})”
replacement = “\”

[[processors.regex.fields]]
key = “id”
pattern = “(\\{2})”
replacement = “\”

@danielnelson
Copy link
Contributor

If this metric is being produced by Telegraf, I think it can be corrected more easily with the strings processor:

[[processors.strings]]
  [[processors.strings.replace]]
    tag = "id"
    old = '\\'
    new = ''

@tcho16
Copy link
Author

tcho16 commented Feb 6, 2019

I'll try that but the metrics are being generated from a Spring boot application using Spring Micrometer.

EDIT = That doesn't work. I'm trying the following regex (([a-zA-z]+)\\{2})+ but it doesn't seem to be targeting the ID field..

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants