Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Kafka Consumer spec #8108

Merged
merged 4 commits into from
Dec 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
245 changes: 245 additions & 0 deletions kafka_consumer/assets/configuration/spec.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,245 @@
name: Kafka Consumer
files:
- name: kafka_consumer.yaml
options:
- template: init_config
overrides:
description: |
All options defined here are available to all instances.

WARNING: To avoid blindly collecting offsets and lag for an unbounded number
of partitions (as could be the case after enabling monitor_unlisted_consumer_groups
or monitor_all_broker_highwatermarks) the check collects metrics for at most 500 partitions.

DEPRECATION NOTICE: In the early days of Kafka, consumer offsets were stored in Zookeeper.
So this check currently supports fetching consumer offsets from both Kafka and Zookeeper.
However, Kafka 0.9 (released in 2015) deprecated Zookeeper storage. As a result, we have also
deprecated fetching consumer offsets from Zookeeper.
options:
- name: kafka_timeout
description: Customizes the Kafka connection timeout.
value:
type: integer
example: 5
default: 5
- name: zk_timeout
description: |
DEPRECATED: Customizes the ZooKeeper connection timeout.
value:
type: integer
example: 5
default: 5
- template: instances
options:
- name: kafka_connect_str
description: |
Kafka endpoints and port to connect to.

In a production environment, it's often useful to specify multiple
Kafka nodes for a single check instance. This way you
only generate a single check process, but if one host goes down,
KafkaClient tries contacting the next host.
Details: https://github.com/DataDog/dd-agent/issues/2943
required: True
value:
type: array
items:
type: string
example:
- localhost:9092
- name: kafka_client_api_version
description: |
Specify the highest client protocol version supported by all brokers in the cluster.

This is a performance optimization. If this is not set, then the check automatically probes
the cluster for broker version during the connection bootstrapping process. Explicitly setting
this bypasses that probe, saving 3-5 network calls depending on the broker version. Note that
probing randomly picks a broker to probe, so in a mixed-version cluster, probing returns a
non-deterministic result.
value:
type: string
example: "2.3.0"
gsalami00 marked this conversation as resolved.
Show resolved Hide resolved
default: null
- name: consumer_groups
description: |
Each level is optional. Any empty values are fetched from the Kafka cluster.
You can have empty partitions (example: <CONSUMER_NAME_2>), topics (example: <CONSUMER_NAME_3>),
and even consumer_groups. If you omit consumer_groups, you must set `monitor_unlisted_consumer_groups` to true.

Deprecation notice: Omitting various levels works for zookeeper-based consumers. However, all
functionality related to fetching offsets from Zookeeper is deprecated.
value:
type: object
example:
<CONSUMER_NAME_1>:
<TOPIC_NAME_1>: [0, 1, 4, 12]
<CONSUMER_NAME_2>:
<TOPIC_NAME_2>: []
<CONSUMER_NAME_3>: {}
- name: monitor_unlisted_consumer_groups
description: |
Setting monitor_unlisted_consumer_groups to `true` tells the check to discover all consumer groups
and fetch all their known offsets. If this is not set to true, you must specify consumer_groups.

WARNING: This feature requires that your Kafka brokers be version >= 0.10.2. It is impossible to
support this feature on older brokers because they do not provide a way to determine the mapping
of consumer groups to topics. For details, see KIP-88. For older Kafka brokers, the consumer groups
must be specified. This requirement only applies to the brokers, not the consumers--they can be any version.

Deprecation notice: This feature works for Zookeeper-based consumers. However, all
functionality related to fetching offsets from Zookeeper is deprecated.
value:
type: boolean
example: false
- name: monitor_all_broker_highwatermarks
description: |
Setting monitor_all_broker_highwatermarks to `true` tells the check to
discover and fetch the broker highwater mark offsets for all kafka topics in
the cluster. Otherwise highwater mark offsets will only be fetched for topic
partitions where that check run has already fetched a consumer offset. Internal
Kafka topics like __consumer_offsets, __transaction_state, etc are always excluded.
value:
type: boolean
example: false
- name: tags
description: |
List of tags to attach to every metric and service check emitted by this integration.

Learn more about tagging at https://docs.datadoghq.com/tagging
value:
type: array
items:
type: string
example:
- <KEY_1>:<VALUE_1>
- <KEY_2>:<VALUE_2>
- name: security_protocol
description: |
Protocol used to communicate with brokers.
Valid values are: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL.
Default: PLAINTEXT.
default: PLAINTEXT
value:
type: string
example: PLAINTEXT
- name: sasl_mechanism
description: |
String picking sasl mechanism when security_protocol is SASL_PLAINTEXT or SASL_SSL.
Valid values are: PLAIN, GSSAPI, OAUTHBEARER, SCRAM-SHA-256, SCRAM-SHA-512.
value:
type: string
example: PLAIN
gsalami00 marked this conversation as resolved.
Show resolved Hide resolved
default: null
- name: sasl_plain_username
description: Username for sasl PLAIN or SCRAM authentication.
value:
type: string
example: <USERNAME>
- name: sasl_plain_password
description: Password for sasl PLAIN or SCRAM authentication.
value:
type: string
example: <PASSWORD>
- name: sasl_kerberos_service_name
description: Service name to include in GSSAPI sasl mechanism handshake.
value:
type: string
example: kafka
- name: sasl_kerberos_domain_name
description: Kerberos domain name to use in GSSAPI sasl mechanism handshake.
value:
type: string
example: localhost
gsalami00 marked this conversation as resolved.
Show resolved Hide resolved
default: null
- name: ssl_context
description: |
Pre-configured SSLContext for wrapping socket connections.
If provided, all other ssl_* configurations are ignored.
value:
type: string
example: <SSL_CONTEXT>
- name: ssl_check_hostname
description: |
Flag to configure whether SSL handshake should verify that the
certificate matches the broker’s hostname.
default: true
value:
type: boolean
example: true
- name: ssl_cafile
description: Filename of CA file path to use in certificate verification.
value:
type: string
example: <CA_FILE_PATH>
- name: ssl_certfile
description: |
Filename path of file in PEM format containing the client certificate,
as well as any CA certificates needed to establish the certificate’s authenticity.
value:
type: string
example: <CERT_FILE_PATH>
- name: ssl_keyfile
description: Optional filename containing the client private key.
value:
type: string
example: <KEY_FILE_PATH>
- name: ssl_password
description: Password to be used when loading the certificate chain.
value:
type: string
example: <PASSWORD>
- name: ssl_crlfile
description: |
Filename path containing the CRL to check for certificate expiration.
By default, no CRL check is done. When providing a file, only the leaf certificate
will be checked against this CRL. The CRL can only be checked with Python 3.4+ or 2.7.9+.
value:
type: string
example: <SSL_FILE_PATH>
- name: broker_requests_batch_size
description: |
The OffsetRequests sent to each broker are batched by the kafka_consumer check in groups of 30 by default.
If the batch size is too big, you may see KafkaTimeoutError exceptions in the logs while
running the wakeup calls.
If the batch size is too small, the check will take longer to run.
default: 30
value:
type: integer
example: 30
- name: zk_connect_str
description: |
DEPRECATION NOTICE: This option is only used for fetching consumer offsets
from Zookeeper and is deprecated.
Zookeeper endpoints and port to connect to.
In a production environment, it's often useful to specify multiple
Zookeeper nodes for a single check instance. This way you
only generate a single check process, but if one host goes down,
KafkaClient / KazooClient tries contacting the next host.
Details: https://github.com/DataDog/dd-agent/issues/2943
value:
type: array
items:
type: object
example:
- localhost:2181
- <ZOOKEEPER_ENDPOINT>:2181
gsalami00 marked this conversation as resolved.
Show resolved Hide resolved
- name: zk_prefix
description: |
DEPRECATION NOTICE: This option is only used for fetching consumer offsets
from Zookeeper and is deprecated.
Zookeeper chroot prefix under which kafka data is living in zookeeper.
If kafka is connecting to `my-zookeeper:2181/kafka` then the `zk_prefix` is `/kafka`.
value:
type: string
example: <ZK_PREFIX>
- name: kafka_consumer_offsets
description: |
DEPRECATION NOTICE: This option is only used for fetching consumer offsets
from Zookeeper and is deprecated.
This setting only applies if zk_connect_str is set
Set to true to fetch consumer offsets from both Zookeeper and Kafka
Set to false to fetch consumer offsets only from Zookeeper.
value:
type: boolean
example: false
default: false
Loading