diff --git a/kafka_consumer/assets/configuration/spec.yaml b/kafka_consumer/assets/configuration/spec.yaml
index 58c0117df12a3..45e34d68cad17 100644
--- a/kafka_consumer/assets/configuration/spec.yaml
+++ b/kafka_consumer/assets/configuration/spec.yaml
@@ -10,11 +10,6 @@ files:
         WARNING: To avoid blindly collecting offsets and lag for an unbounded number
         of partitions (as could be the case after enabling monitor_unlisted_consumer_groups
         or monitor_all_broker_highwatermarks) the check collects metrics for at most 500 partitions.
-
-        DEPRECATION NOTICE: In the early days of Kafka, consumer offsets were stored in Zookeeper.
-        So this check currently supports fetching consumer offsets from both Kafka and Zookeeper.
-        However, Kafka 0.9 (released in 2015) deprecated Zookeeper storage. As a result, we have also
-        deprecated fetching consumer offsets from Zookeeper.
     options:
     - name: kafka_timeout
       description: Customizes the Kafka connection timeout.
@@ -22,13 +17,6 @@ files:
         type: integer
         example: 5
       display_default: 5
-    - name: zk_timeout
-      description: |
-        DEPRECATED: Customizes the ZooKeeper connection timeout.
-      value:
-        type: integer
-        example: 5
-      display_default: 5
     - template: init_config/default
   - template: instances
     options:
@@ -76,9 +64,6 @@ files:
           Each level is optional. Any empty values are fetched from the Kafka cluster.
           You can have empty partitions (example: <CONSUMER_NAME_2>), topics (example: <CONSUMER_NAME_3>),
           and even consumer_groups. If you omit consumer_groups, you must set `monitor_unlisted_consumer_groups` to true.
-
-          Deprecation notice: Omitting various levels works for zookeeper-based consumers. However, all
-          functionality related to fetching offsets from Zookeeper is deprecated.
         value:
           type: object
           example:
@@ -96,8 +81,6 @@ files:
           support this feature on older brokers because they do not provide a way to determine the mapping
           of consumer groups to topics. For details, see KIP-88. For older Kafka brokers, the consumer groups
           must be specified. This requirement only applies to the brokers, not the consumers--they can be any version.
-
-          Deprecation notice: Functionality related to consumers fetching offsets from Zookeeper is deprecated.
         value:
           type: boolean
           example: false
@@ -195,53 +178,6 @@ files:
           type: integer
           example: 30
       - template: instances/default
-      - name: zk_connect_str
-        description: |
-          DEPRECATION NOTICE: This option is only used for fetching consumer offsets
-          from Zookeeper and is deprecated.
-          Zookeeper endpoints and port to connect to.
-          In a production environment, it's often useful to specify multiple
-          Zookeeper nodes for a single check instance. This way you
-          only generate a single check process, but if one host goes down,
-          KafkaClient / KazooClient tries contacting the next host.
-          Details: https://github.com/DataDog/dd-agent/issues/2943
-
-          You may specify a single server like:
-
-            zk_connect_str: localhost:2181
-
-          or multiple servers like:
-
-            zk_connect_str:
-            - server1:2181
-            - server2:2181
-        value:
-          anyOf:
-          - type: string
-          - type: array
-            items:
-              type: string
-      - name: zk_prefix
-        description: |
-          DEPRECATION NOTICE: This option is only used for fetching consumer offsets
-          from Zookeeper and is deprecated.
-          Zookeeper chroot prefix under which kafka data is living in zookeeper.
-          If kafka is connecting to `my-zookeeper:2181/kafka` then the `zk_prefix` is `/kafka`.
-        value:
-          type: string
-          example: <ZK_PREFIX>
-      - name: kafka_consumer_offsets
-        description: |
-          DEPRECATION NOTICE: This option is only used for fetching consumer offsets
-          from Zookeeper and is deprecated.
-          This setting only applies if `zk_connect_str` is set and cannot work with
-          `monitor_unlisted_consumer_groups` since the generated list comes from Zookeeper.
-          Set to true to fetch consumer offsets from both Zookeeper and Kafka
-          Set to false to fetch consumer offsets only from Zookeeper.
-        value:
-          type: boolean
-          example: false
-        display_default: false
       - name: data_streams_enabled
         description: |
           Beta feature to get a lag metric in Seconds. It's part of the Data Streams Monitoring Product
diff --git a/kafka_consumer/datadog_checks/kafka_consumer/config_models/defaults.py b/kafka_consumer/datadog_checks/kafka_consumer/config_models/defaults.py
index 6ee583a5118c3..f5fb94ae1a8e9 100644
--- a/kafka_consumer/datadog_checks/kafka_consumer/config_models/defaults.py
+++ b/kafka_consumer/datadog_checks/kafka_consumer/config_models/defaults.py
@@ -18,10 +18,6 @@ def shared_service(field, value):
     return get_default_field_value(field, value)
 
 
-def shared_zk_timeout(field, value):
-    return 5
-
-
 def instance_broker_requests_batch_size(field, value):
     return 30
 
@@ -46,10 +42,6 @@ def instance_kafka_client_api_version(field, value):
     return get_default_field_value(field, value)
 
 
-def instance_kafka_consumer_offsets(field, value):
-    return False
-
-
 def instance_metric_patterns(field, value):
     return get_default_field_value(field, value)
 
@@ -128,11 +120,3 @@ def instance_tls_validate_hostname(field, value):
 
 def instance_tls_verify(field, value):
     return True
-
-
-def instance_zk_connect_str(field, value):
-    return get_default_field_value(field, value)
-
-
-def instance_zk_prefix(field, value):
-    return get_default_field_value(field, value)
diff --git a/kafka_consumer/datadog_checks/kafka_consumer/config_models/instance.py b/kafka_consumer/datadog_checks/kafka_consumer/config_models/instance.py
index ac961c3172d31..34f471e45d6d4 100644
--- a/kafka_consumer/datadog_checks/kafka_consumer/config_models/instance.py
+++ b/kafka_consumer/datadog_checks/kafka_consumer/config_models/instance.py
@@ -47,7 +47,6 @@ class Config:
     empty_default_hostname: Optional[bool]
     kafka_client_api_version: Optional[str]
     kafka_connect_str: Union[str, Sequence[str]]
-    kafka_consumer_offsets: Optional[bool]
     metric_patterns: Optional[MetricPatterns]
     min_collection_interval: Optional[float]
     monitor_all_broker_highwatermarks: Optional[bool]
@@ -68,8 +67,6 @@ class Config:
     tls_private_key_password: Optional[str]
     tls_validate_hostname: Optional[bool]
     tls_verify: Optional[bool]
-    zk_connect_str: Optional[Union[str, Sequence[str]]]
-    zk_prefix: Optional[str]
 
     @root_validator(pre=True)
     def _initial_validation(cls, values):
diff --git a/kafka_consumer/datadog_checks/kafka_consumer/config_models/shared.py b/kafka_consumer/datadog_checks/kafka_consumer/config_models/shared.py
index e6741408b7c96..3183c3d22ad97 100644
--- a/kafka_consumer/datadog_checks/kafka_consumer/config_models/shared.py
+++ b/kafka_consumer/datadog_checks/kafka_consumer/config_models/shared.py
@@ -25,7 +25,6 @@ class Config:
 
     kafka_timeout: Optional[int]
     service: Optional[str]
-    zk_timeout: Optional[int]
 
     @root_validator(pre=True)
     def _initial_validation(cls, values):
diff --git a/kafka_consumer/datadog_checks/kafka_consumer/data/conf.yaml.example b/kafka_consumer/datadog_checks/kafka_consumer/data/conf.yaml.example
index 37510199ba0be..99f4bbd9207d0 100644
--- a/kafka_consumer/datadog_checks/kafka_consumer/data/conf.yaml.example
+++ b/kafka_consumer/datadog_checks/kafka_consumer/data/conf.yaml.example
@@ -3,11 +3,6 @@
 ## WARNING: To avoid blindly collecting offsets and lag for an unbounded number
 ## of partitions (as could be the case after enabling monitor_unlisted_consumer_groups
 ## or monitor_all_broker_highwatermarks) the check collects metrics for at most 500 partitions.
-##
-## DEPRECATION NOTICE: In the early days of Kafka, consumer offsets were stored in Zookeeper.
-## So this check currently supports fetching consumer offsets from both Kafka and Zookeeper.
-## However, Kafka 0.9 (released in 2015) deprecated Zookeeper storage. As a result, we have also
-## deprecated fetching consumer offsets from Zookeeper.
 #
 init_config:
 
@@ -16,11 +11,6 @@ init_config:
     #
     # kafka_timeout: 5
 
-    ## @param zk_timeout - integer - optional - default: 5
-    ## DEPRECATED: Customizes the ZooKeeper connection timeout.
-    #
-    # zk_timeout: 5
-
     ## @param service - string - optional
     ## Attach the tag `service:<SERVICE>` to every metric, event, and service check emitted by this integration.
     ##
@@ -68,9 +58,6 @@ instances:
     ## Each level is optional. Any empty values are fetched from the Kafka cluster.
     ## You can have empty partitions (example: <CONSUMER_NAME_2>), topics (example: <CONSUMER_NAME_3>),
     ## and even consumer_groups. If you omit consumer_groups, you must set `monitor_unlisted_consumer_groups` to true.
-    ##
-    ## Deprecation notice: Omitting various levels works for zookeeper-based consumers. However, all
-    ## functionality related to fetching offsets from Zookeeper is deprecated.
     #
     # consumer_groups:
     #   <CONSUMER_NAME_1>:
@@ -91,8 +78,6 @@ instances:
     ## support this feature on older brokers because they do not provide a way to determine the mapping
     ## of consumer groups to topics. For details, see KIP-88. For older Kafka brokers, the consumer groups
     ## must be specified. This requirement only applies to the brokers, not the consumers--they can be any version.
-    ##
-    ## Deprecation notice: Functionality related to consumers fetching offsets from Zookeeper is deprecated.
     #
     # monitor_unlisted_consumer_groups: false
 
@@ -256,46 +241,6 @@ instances:
     #   exclude:
     #   - <EXCLUDE_REGEX>
 
-    ## @param zk_connect_str - string or list of strings - optional
-    ## DEPRECATION NOTICE: This option is only used for fetching consumer offsets
-    ## from Zookeeper and is deprecated.
-    ## Zookeeper endpoints and port to connect to.
-    ## In a production environment, it's often useful to specify multiple
-    ## Zookeeper nodes for a single check instance. This way you
-    ## only generate a single check process, but if one host goes down,
-    ## KafkaClient / KazooClient tries contacting the next host.
-    ## Details: https://github.com/DataDog/dd-agent/issues/2943
-    ##
-    ## You may specify a single server like:
-    ##
-    ##   zk_connect_str: localhost:2181
-    ##
-    ## or multiple servers like:
-    ##
-    ##   zk_connect_str:
-    ##   - server1:2181
-    ##   - server2:2181
-    #
-    # zk_connect_str: <ZK_CONNECT_STR>
-
-    ## @param zk_prefix - string - optional
-    ## DEPRECATION NOTICE: This option is only used for fetching consumer offsets
-    ## from Zookeeper and is deprecated.
-    ## Zookeeper chroot prefix under which kafka data is living in zookeeper.
-    ## If kafka is connecting to `my-zookeeper:2181/kafka` then the `zk_prefix` is `/kafka`.
-    #
-    # zk_prefix: <ZK_PREFIX>
-
-    ## @param kafka_consumer_offsets - boolean - optional - default: false
-    ## DEPRECATION NOTICE: This option is only used for fetching consumer offsets
-    ## from Zookeeper and is deprecated.
-    ## This setting only applies if `zk_connect_str` is set and cannot work with
-    ## `monitor_unlisted_consumer_groups` since the generated list comes from Zookeeper.
-    ## Set to true to fetch consumer offsets from both Zookeeper and Kafka
-    ## Set to false to fetch consumer offsets only from Zookeeper.
-    #
-    # kafka_consumer_offsets: false
-
     ## @param data_streams_enabled - boolean - optional - default: false
     ## Beta feature to get a lag metric in Seconds. It's part of the Data Streams Monitoring Product
     #
diff --git a/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py b/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py
index bc648ba77d5ce..a83376dd3a768 100644
--- a/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py
+++ b/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py
@@ -12,7 +12,6 @@
 from datadog_checks.base.utils.http import AuthTokenOAuthReader
 
 from .constants import CONTEXT_UPPER_BOUND, DEFAULT_KAFKA_TIMEOUT
-from .legacy_0_10_2 import LegacyKafkaCheck_0_10_2
 from .new_kafka_consumer import NewKafkaConsumerCheck
 
 
@@ -40,7 +39,6 @@ class KafkaCheck(AgentCheck):
 
     def __init__(self, name, init_config, instances):
         super(KafkaCheck, self).__init__(name, init_config, instances)
-        self.sub_check = None
         self._context_limit = int(self.init_config.get('max_partition_contexts', CONTEXT_UPPER_BOUND))
         self._data_streams_enabled = is_affirmative(self.instance.get('data_streams_enabled', False))
         self._custom_tags = self.instance.get('tags', [])
@@ -52,7 +50,7 @@ def __init__(self, name, init_config, instances):
         )
         self._consumer_groups = self.instance.get('consumer_groups', {})
 
-        self.check_initializations.append(self._init_check_based_on_kafka_version)
+        self.sub_check = NewKafkaConsumerCheck(self)
 
     def check(self, _):
         return self.sub_check.check()
@@ -93,47 +91,6 @@ def validate_consumer_groups(self):
                         for partition in partitions:
                             assert isinstance(partition, int)
 
-    def _init_check_based_on_kafka_version(self):
-        """Set the sub_check attribute before allowing the `check` method to run. If something fails, this method will
-        be retried regularly."""
-        self.sub_check = self._make_sub_check()
-
-    def _make_sub_check(self):
-        """Determine whether to use old legacy KafkaClient implementation or the new KafkaAdminClient implementation.
-
-        The legacy version of this check uses the KafkaClient and handrolls things like looking up the GroupCoordinator,
-        crafting the offset requests, handling errors, etc.
-
-        The new implementation uses the KafkaAdminClient which lets us offload most of the Kafka-specific bits onto the
-        kafka-python library, which is used by many other tools and reduces our maintenance burden.
-
-        Unfortunately, the KafkaAdminClient requires brokers >= 0.10.0, so we split the check into legacy and new code.
-
-        Furthermore, we took the opportunity to simplify the new code by dropping support for:
-        1) Zookeeper-based offsets. These have been deprecated since Kafka 0.9.
-        2) Kafka brokers < 0.10.2. It is impossible to support monitor_unlisted_consumer_groups on these older brokers
-        because they do not provide a way to determine the mapping of consumer groups to topics. For details, see
-        KIP-88.
-
-        To clarify: This check still allows fetching offsets from zookeeper/older kafka brokers, it just uses the
-        legacy code path."""
-        if self.instance.get('zk_connect_str'):
-            return LegacyKafkaCheck_0_10_2(self)
-
-        kafka_version = self.instance.get('kafka_client_api_version')
-        if isinstance(kafka_version, str):
-            kafka_version = tuple(map(int, kafka_version.split(".")))
-        if kafka_version is None:  # if unspecified by the user, we have to probe the cluster
-            kafka_client = self.create_kafka_client()
-            # version probing happens automatically as part of KafkaClient's __init__()
-            kafka_version = kafka_client.config['api_version']
-            kafka_client.close()
-
-        if kafka_version < (0, 10, 2):
-            return LegacyKafkaCheck_0_10_2(self)
-
-        return NewKafkaConsumerCheck(self)
-
     def _create_kafka_client(self, clazz):
         kafka_connect_str = self.instance.get('kafka_connect_str')
         if not isinstance(kafka_connect_str, (string_types, list)):
diff --git a/kafka_consumer/datadog_checks/kafka_consumer/legacy_0_10_2.py b/kafka_consumer/datadog_checks/kafka_consumer/legacy_0_10_2.py
deleted file mode 100644
index 76cfe22e3c9ec..0000000000000
--- a/kafka_consumer/datadog_checks/kafka_consumer/legacy_0_10_2.py
+++ /dev/null
@@ -1,428 +0,0 @@
-# (C) Datadog, Inc. 2019-present
-# All rights reserved
-# Licensed under Simplified BSD License (see LICENSE)
-from __future__ import division
-
-from collections import defaultdict
-
-from kafka import errors as kafka_errors
-from kafka.protocol.commit import GroupCoordinatorRequest, OffsetFetchRequest
-from kafka.protocol.offset import OffsetRequest, OffsetResetStrategy, OffsetResponse
-from kazoo.client import KazooClient
-from kazoo.exceptions import NoNodeError
-from six import string_types
-
-from datadog_checks.base import ConfigurationError, is_affirmative
-
-from .constants import KAFKA_INTERNAL_TOPICS
-
-
-class LegacyKafkaCheck_0_10_2(object):
-    """
-    Check the offsets and lag of Kafka consumers. This check also returns broker highwater offsets.
-
-    This is the legacy codepath which is used when either broker version < 0.10.2 or zk_connect_str has a value.
-    """
-
-    def __init__(self, parent_check):
-        self._parent_check = parent_check
-        self._kafka_client = None
-
-        # Note: We cannot skip validation if monitor_unlisted_consumer_groups is True because this legacy check only
-        # supports that functionality for Zookeeper, not Kafka.
-        self.validate_consumer_groups()
-
-        self._zk_hosts_ports = self.instance.get('zk_connect_str')
-
-        # If we are collecting from Zookeeper, then create a long-lived zk client
-        if self._zk_hosts_ports is not None:
-
-            # any chroot prefix gets appended onto the host string or the last item on the host list
-            chroot = self.instance.get('zk_prefix')
-            if chroot is not None:
-                if isinstance(self._zk_hosts_ports, string_types):
-                    self._zk_hosts_ports += chroot
-                elif isinstance(self._zk_hosts_ports, list):
-                    self._zk_hosts_ports.append(chroot)
-                else:
-                    raise ConfigurationError("zk_connect_str must be a string or list of strings")
-
-            self._zk_client = KazooClient(
-                hosts=self._zk_hosts_ports, timeout=int(self.init_config.get('zk_timeout', 5))
-            )
-            self._zk_client.start()
-
-    def __getattr__(self, item):
-        try:
-            return getattr(self._parent_check, item)
-        except AttributeError:
-            raise AttributeError("LegacyKafkaCheck_0_10_2 has no attribute called {}".format(item))
-
-    @property
-    def kafka_client(self):
-        if self._kafka_client is None:
-            self._kafka_client = self._create_kafka_client()
-        return self._kafka_client
-
-    def check(self):
-        """The main entrypoint of the check."""
-        self.log.debug("Running legacy Kafka Consumer check.")
-        self._zk_consumer_offsets = {}  # Expected format: {(consumer_group, topic, partition): offset}
-        self._kafka_consumer_offsets = {}  # Expected format: {(consumer_group, topic, partition): offset}
-        self._highwater_offsets = {}  # Expected format: {(topic, partition): offset}
-        contexts_limit = self._context_limit
-
-        # For calculating consumer lag, we have to fetch both the consumer offset and the broker highwater offset.
-        # There's a potential race condition because whichever one we check first may be outdated by the time we check
-        # the other. Better to check consumer offsets before checking broker offsets because worst case is that
-        # overstates consumer lag a little. Doing it the other way can understate consumer lag to the point of having
-        # negative consumer lag, which just creates confusion because it's theoretically impossible.
-
-        # Fetch consumer group offsets from Zookeeper
-        if self._zk_hosts_ports is not None:
-            try:
-                self._get_zk_consumer_offsets(contexts_limit)
-                contexts_limit -= len(self._zk_consumer_offsets)
-            except Exception:
-                self.log.exception("There was a problem collecting consumer offsets from Zookeeper.")
-                # don't raise because we might get valid broker offsets
-
-        # Fetch consumer group offsets from Kafka
-        # Support for storing offsets in Kafka not available until Kafka 0.8.2. Also, for legacy reasons, this check
-        # only fetches consumer offsets from Kafka if Zookeeper is omitted or kafka_consumer_offsets is True.
-        if self.kafka_client.config.get('api_version') >= (0, 8, 2) and is_affirmative(
-            self.instance.get('kafka_consumer_offsets', self._zk_hosts_ports is None)
-        ):
-            try:
-                self.log.debug('Collecting consumer offsets')
-                self._get_kafka_consumer_offsets(contexts_limit)
-                contexts_limit -= len(self._kafka_consumer_offsets)
-            except Exception:
-                self.log.exception("There was a problem collecting consumer offsets from Kafka.")
-                # don't raise because we might get valid broker offsets
-        else:
-            self.log.debug(
-                'Identified api_version: %s, kafka_consumer_offsets: %s, zk_connection_string: %s.'
-                ' Skipping consumer offset collection',
-                str(self.kafka_client.config.get('api_version')),
-                str(self.instance.get('kafka_consumer_offsets')),
-                str(self._zk_hosts_ports),
-            )
-
-        # Fetch the broker highwater offsets
-        try:
-            self._get_highwater_offsets(contexts_limit)
-        except Exception:
-            self.log.exception('There was a problem collecting the highwater mark offsets')
-            # Unlike consumer offsets, fail immediately because we can't calculate consumer lag w/o highwater_offsets
-            raise
-
-        total_contexts = sum(
-            [len(self._zk_consumer_offsets), len(self._kafka_consumer_offsets), len(self._highwater_offsets)]
-        )
-        if total_contexts > self._context_limit:
-            self.warning(
-                """Discovered %s metric contexts - this exceeds the maximum number of %s contexts permitted by the
-                check. Please narrow your target by specifying in your kafka_consumer.yaml the consumer groups, topics
-                and partitions you wish to monitor.""",
-                total_contexts,
-                self._context_limit,
-            )
-        # Report the metrics
-        self._report_highwater_offsets()
-        self._report_consumer_offsets_and_lag(self._kafka_consumer_offsets)
-        # if someone is in the middle of migrating their offset storage from zookeeper to kafka,
-        # they need to identify which source is reporting which offsets. So we tag zookeeper with 'source:zk'
-        self._report_consumer_offsets_and_lag(self._zk_consumer_offsets, source='zk')
-
-    def _create_kafka_client(self):
-        kafka_client = self.create_kafka_client()
-        # Force initial population of the local cluster metadata cache
-        kafka_client.poll(future=kafka_client.cluster.request_update())
-        if kafka_client.cluster.topics(exclude_internal_topics=False) is None:
-            raise RuntimeError("Local cluster metadata cache did not populate.")
-        return kafka_client
-
-    def _make_blocking_req(self, request, node_id=None):
-        if node_id is None:
-            node_id = self.kafka_client.least_loaded_node()
-
-        while not self.kafka_client.ready(node_id):
-            # poll until the connection to broker is ready, otherwise send() will fail with NodeNotReadyError
-            self.kafka_client.poll()
-
-        future = self.kafka_client.send(node_id, request)
-        self.kafka_client.poll(future=future)  # block until we get response.
-        if future.failed():
-            raise future.exception  # pylint: disable-msg=raising-bad-type
-        response = future.value
-        return response
-
-    def _get_highwater_offsets(self, contexts_limit):
-        """
-        Fetch highwater offsets for topic_partitions in the Kafka cluster.
-
-        If monitor_all_broker_highwatermarks is True, will fetch for all partitions in the cluster. Otherwise highwater
-        mark offsets will only be fetched for topic partitions where this check run has already fetched a consumer
-        offset.
-
-        Internal Kafka topics like __consumer_offsets, __transaction_state, etc are always excluded.
-
-        Any partitions that don't currently have a leader will be skipped.
-
-        Sends one OffsetRequest per broker to get offsets for all partitions where that broker is the leader:
-        https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetAPI(AKAListOffset)
-        """
-        # If we aren't fetching all broker highwater offsets, then construct the unique set of topic partitions for
-        # which this run of the check has at least once saved consumer offset. This is later used as a filter for
-        # excluding partitions.
-        if not self._monitor_all_broker_highwatermarks:
-            tps_with_consumer_offset = {(topic, partition) for (_, topic, partition) in self._kafka_consumer_offsets}
-            tps_with_consumer_offset.update({(topic, partition) for (_, topic, partition) in self._zk_consumer_offsets})
-
-        for broker in self.kafka_client.cluster.brokers():
-            if len(self._highwater_offsets) >= contexts_limit:
-                self.warning("Context limit reached. Skipping highwater offsets collection.")
-                return
-            broker_led_partitions = self.kafka_client.cluster.partitions_for_broker(broker.nodeId)
-            if broker_led_partitions is None:
-                continue
-            # Take the partitions for which this broker is the leader and group them by topic in order to construct the
-            # OffsetRequest while simultaneously filtering out partitions we want to exclude
-            partitions_grouped_by_topic = defaultdict(list)
-            for topic, partition in broker_led_partitions:
-                # No sense fetching highwater offsets for internal topics
-                if topic not in KAFKA_INTERNAL_TOPICS and (
-                    self._monitor_all_broker_highwatermarks or (topic, partition) in tps_with_consumer_offset
-                ):
-                    partitions_grouped_by_topic[topic].append(partition)
-
-            # Construct the OffsetRequest
-            max_offsets = 1
-            request = OffsetRequest[0](
-                replica_id=-1,
-                topics=[
-                    (topic, [(partition, OffsetResetStrategy.LATEST, max_offsets) for partition in partitions])
-                    for topic, partitions in partitions_grouped_by_topic.items()
-                ],
-            )
-            response = self._make_blocking_req(request, node_id=broker.nodeId)
-            self._process_highwater_offsets(response, contexts_limit)
-
-    def _process_highwater_offsets(self, response, contexts_limit):
-        """Parse an OffsetFetchResponse and save it to the highwater_offsets dict."""
-        if type(response) not in OffsetResponse:
-            raise RuntimeError("response type should be OffsetResponse, but instead was %s." % type(response))
-        for topic, partitions_data in response.topics:
-            for partition, error_code, offsets in partitions_data:
-                error_type = kafka_errors.for_code(error_code)
-                if error_type is kafka_errors.NoError:
-                    self._highwater_offsets[(topic, partition)] = offsets[0]
-                    if len(self._highwater_offsets) >= contexts_limit:
-                        self.warning("Context limit reached. Skipping highwater offsets processing.")
-                        return
-                elif error_type is kafka_errors.NotLeaderForPartitionError:
-                    self.log.warning(
-                        "Kafka broker returned %s (error_code %s) for topic %s, partition: %s. This should only happen "
-                        "if the broker that was the partition leader when kafka_admin_client last fetched metadata is "
-                        "no longer the leader.",
-                        error_type.message,
-                        error_type.errno,
-                        topic,
-                        partition,
-                    )
-                    self.kafka_client.cluster.request_update()  # force metadata update on next poll()
-                elif error_type is kafka_errors.UnknownTopicOrPartitionError:
-                    self.log.warning(
-                        "Kafka broker returned %s (error_code %s) for topic: %s, partition: %s. This should only "
-                        "happen if the topic is currently being deleted or the check configuration lists non-existent "
-                        "topic partitions.",
-                        error_type.message,
-                        error_type.errno,
-                        topic,
-                        partition,
-                    )
-                else:
-                    raise error_type(
-                        "Unexpected error encountered while attempting to fetch the highwater offsets for topic: %s, "
-                        "partition: %s." % (topic, partition)
-                    )
-
-    def _report_highwater_offsets(self):
-        """Report the broker highwater offsets."""
-        for (topic, partition), highwater_offset in self._highwater_offsets.items():
-            broker_tags = ['topic:%s' % topic, 'partition:%s' % partition]
-            broker_tags.extend(self._custom_tags)
-            self.gauge('broker_offset', highwater_offset, tags=broker_tags)
-
-    def _report_consumer_offsets_and_lag(self, consumer_offsets, **kwargs):
-        """Report the consumer group offsets and consumer lag."""
-        for (consumer_group, topic, partition), consumer_offset in consumer_offsets.items():
-            consumer_group_tags = ['topic:%s' % topic, 'partition:%s' % partition, 'consumer_group:%s' % consumer_group]
-            if 'source' in kwargs:
-                consumer_group_tags.append('source:%s' % kwargs['source'])
-            consumer_group_tags.extend(self._custom_tags)
-
-            partitions = self.kafka_client.cluster.partitions_for_topic(topic)
-            if partitions is not None and partition in partitions:
-                # report consumer offset if the partition is valid because even if leaderless the consumer offset will
-                # be valid once the leader failover completes
-                self.gauge('consumer_offset', consumer_offset, tags=consumer_group_tags)
-
-                if (topic, partition) not in self._highwater_offsets:
-                    self.log.warning(
-                        "Consumer group: %s has offsets for topic: %s partition: %s, but no stored highwater offset "
-                        "(likely the partition is in the middle of leader failover) so cannot calculate consumer lag.",
-                        consumer_group,
-                        topic,
-                        partition,
-                    )
-                    continue
-
-                consumer_lag = self._highwater_offsets[(topic, partition)] - consumer_offset
-                self.gauge('consumer_lag', consumer_lag, tags=consumer_group_tags)
-
-                if consumer_lag < 0:  # this will effectively result in data loss, so emit an event for max visibility
-                    title = "Negative consumer lag for group: {}.".format(consumer_group)
-                    message = (
-                        "Consumer group: {}, topic: {}, partition: {} has negative consumer lag. This should never "
-                        "happen and will result in the consumer skipping new messages until the lag turns "
-                        "positive.".format(consumer_group, topic, partition)
-                    )
-                    key = "{}:{}:{}".format(consumer_group, topic, partition)
-                    self.send_event(title, message, consumer_group_tags, 'consumer_lag', key, severity="error")
-                    self.log.debug(message)
-
-            else:
-                if partitions is None:
-                    msg = (
-                        "Consumer group: %s has offsets for topic: %s, partition: %s, but that topic has no partitions "
-                        "in the cluster, so skipping reporting these offsets."
-                    )
-                else:
-                    msg = (
-                        "Consumer group: %s has offsets for topic: %s, partition: %s, but that topic partition isn't "
-                        "included in the cluster partitions, so skipping reporting these offsets."
-                    )
-                self.log.warning(msg, consumer_group, topic, partition)
-                self.kafka_client.cluster.request_update()  # force metadata update on next poll()
-
-    def _get_zk_path_children(self, zk_path, name_for_error):
-        """Fetch child nodes for a given Zookeeper path."""
-        children = []
-        try:
-            children = self._zk_client.get_children(zk_path)
-        except NoNodeError:
-            self.log.info('No zookeeper node at %s', zk_path)
-        except Exception:
-            self.log.exception('Could not read %s from %s', name_for_error, zk_path)
-        return children
-
-    def _get_zk_consumer_offsets(self, contexts_limit):
-        """
-        Fetch Consumer Group offsets from Zookeeper.
-
-        Also fetch consumer_groups, topics, and partitions if not
-        already specified in consumer_groups.
-        """
-        # Construct the Zookeeper path pattern
-        # /consumers/[groupId]/offsets/[topic]/[partitionId]
-        zk_path_consumer = '/consumers/'
-        zk_path_topic_tmpl = zk_path_consumer + '{group}/offsets/'
-        zk_path_partition_tmpl = zk_path_topic_tmpl + '{topic}/'
-
-        if self._monitor_unlisted_consumer_groups:
-            # don't overwrite self._consumer_groups because that holds the static config values which are always used
-            # when fetching consumer offsets from Kafka. Also, these dynamically fetched groups may change on each run.
-            consumer_groups = {
-                consumer_group: None
-                for consumer_group in self._get_zk_path_children(zk_path_consumer, 'consumer groups')
-            }
-        else:
-            consumer_groups = self._consumer_groups
-
-        for consumer_group, topics in consumer_groups.items():
-            if not topics:  # If topics are't specified, fetch them from ZK
-                zk_path_topics = zk_path_topic_tmpl.format(group=consumer_group)
-                topics = {topic: None for topic in self._get_zk_path_children(zk_path_topics, 'topics')}
-
-            for topic, partitions in topics.items():
-                if not partitions:  # If partitions aren't specified, fetch them from ZK
-                    zk_path_partitions = zk_path_partition_tmpl.format(group=consumer_group, topic=topic)
-                    # Zookeeper returns the partition IDs as strings because they are extracted from the node path
-                    partitions = [int(x) for x in self._get_zk_path_children(zk_path_partitions, 'partitions')]
-
-                for partition in partitions:
-                    zk_path = (zk_path_partition_tmpl + '{partition}/').format(
-                        group=consumer_group, topic=topic, partition=partition
-                    )
-                    try:
-                        consumer_offset = int(self._zk_client.get(zk_path)[0])
-                        key = (consumer_group, topic, partition)
-                        self._zk_consumer_offsets[key] = consumer_offset
-
-                        if len(self._zk_consumer_offsets) >= contexts_limit:
-                            self.warning("Context limit reached. Skipping zk consumer offsets collection.")
-                            return
-                    except NoNodeError:
-                        self.log.info('No zookeeper node at %s', zk_path)
-                        continue
-                    except Exception:
-                        self.log.exception('Could not read consumer offset from %s', zk_path)
-
-    def _get_kafka_consumer_offsets(self, contexts_limit):
-        """
-        Fetch Consumer Group offsets from Kafka.
-
-        These offsets are stored in the __consumer_offsets topic rather than in Zookeeper.
-        """
-        for consumer_group, topic_partitions in self._consumer_groups.items():
-            if not topic_partitions:
-                raise ConfigurationError(
-                    'Invalid configuration - if you are collecting consumer offsets from Kafka, and your brokers are '
-                    'older than 0.10.2, then you _must_ specify consumer groups and their topics. Older brokers lack '
-                    'the necessary protocol support to determine which topics a consumer is consuming. See KIP-88 for '
-                    'details.'
-                )
-            try:  # catch exceptions on a group-by-group basis so that if one fails we still fetch the other groups
-                for topic, partitions in topic_partitions.items():
-                    if not partitions:
-                        # If partitions omitted, then we assume the group is consuming all partitions for the topic.
-                        # Fetch consumer offsets even for unavailable partitions because those will be valid once the
-                        # partition finishes leader failover.
-                        topic_partitions[topic] = self.kafka_client.cluster.partitions_for_topic(topic)
-
-                coordinator_id = self._get_group_coordinator(consumer_group)
-                if coordinator_id is not None:
-                    # Kafka protocol uses OffsetFetchRequests to retrieve consumer offsets:
-                    # https://kafka.apache.org/protocol#The_Messages_OffsetFetch
-                    # https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetFetchRequest
-                    request = OffsetFetchRequest[1](consumer_group, list(topic_partitions.items()))
-                    response = self._make_blocking_req(request, node_id=coordinator_id)
-                    for (topic, partition_offsets) in response.topics:
-                        for partition, offset, _metadata, error_code in partition_offsets:
-                            error_type = kafka_errors.for_code(error_code)
-                            # If the OffsetFetchRequest explicitly specified partitions, the offset could returned as
-                            # -1, meaning there is no recorded offset for that partition... for example, if the
-                            # partition doesn't exist in the cluster. So ignore it.
-                            if offset == -1 or error_type is not kafka_errors.NoError:
-                                self.kafka_client.cluster.request_update()  # force metadata update on next poll()
-                                continue
-                            key = (consumer_group, topic, partition)
-                            self._kafka_consumer_offsets[key] = offset
-
-                            if len(self._kafka_consumer_offsets) >= contexts_limit:
-                                self.warning("Context limit reached. Skipping kafka consumer offsets collection.")
-                                return
-                else:
-                    self.log.info("unable to find group coordinator for %s", consumer_group)
-            except Exception:
-                self.log.exception('Could not read consumer offsets from Kafka for group: %s', consumer_group)
-
-    def _get_group_coordinator(self, group):
-        """Determine which broker is the Group Coordinator for a specific consumer group."""
-        request = GroupCoordinatorRequest[0](group)
-        response = self._make_blocking_req(request)
-        error_type = kafka_errors.for_code(response.error_code)
-        if error_type is kafka_errors.NoError:
-            return response.coordinator_id
diff --git a/kafka_consumer/hatch.toml b/kafka_consumer/hatch.toml
index 95c0bd883746a..b6667c74d50e7 100644
--- a/kafka_consumer/hatch.toml
+++ b/kafka_consumer/hatch.toml
@@ -1,30 +1,16 @@
 [env.collectors.datadog-checks]
 
-[[envs.default.matrix]]
-python = ["2.7", "3.8"]
-# Note: 0.9 is for testing legacy pre-0.10.2 implementation
-version = ["0.9", "3.3"]
-offset-storage = ["kafka", "zk"]
-
 [[envs.default.matrix]]
 python = ["3.8"]
-version = ["0.11", "1.1", "2.3"]
-offset-storage = ["kafka", "zk"]
+version = ["1.1", "2.3", "3.3"]
 
 [envs.default.overrides]
-matrix.version.e2e-env = { value = true, if = ["0.9", "3.3"] }
+matrix.version.e2e-env = { value = true, if = ["3.3"] }
 matrix.version.env-vars = [
-  { key = "KAFKA_VERSION", value = "0.9.0.1-1", if = ["0.9"] },
-  { key = "KAFKA_VERSION", value = "0.11.0.1", if = ["0.11"] },
   { key = "KAFKA_VERSION", value = "1.1.1", if = ["1.1"] },
   { key = "KAFKA_VERSION", value = "2.3.1", if = ["2.3"] },
   { key = "KAFKA_VERSION", value = "3.3.2", if = ["3.3"] },
-  # Using Kafka 0.9 + with ZK 3.4 due to flakiness of Kafka 0.9 + ZK 3.6
-  { key = "ZK_VERSION", value = "3.4.11", if = ["0.9"] },
-  { key = "ZK_VERSION", value = "3.6.4", if = ["0.11", "1.1", "2.3", "3.3"] },
-  { key = "USE_MULTIPLE_BROKERS", value = "true", if = ["1.1", "2.3", "3.3"] },
-  # Can't have a working `docker compose` for these versions
-  { key = "USE_MULTIPLE_BROKERS", value = "false", if = ["0.9", "0.11"] },
+  "ZK_VERSION=3.6.4"
 ]
 matrix.offset-storage.env-vars = [
   { key = "KAFKA_OFFSETS_STORAGE", value = "kafka", if = ["kafka"] },
@@ -38,7 +24,4 @@ dependencies = [
 e2e-env = false
 
 [envs.latest.env-vars]
-KAFKA_OFFSETS_STORAGE = "kafka"
-KAFKA_VERSION = "latest"
-USE_MULTIPLE_BROKERS = "true"
-ZK_VERSION = "3.6.4"
+KAFKA_VERSION = "latest"
\ No newline at end of file
diff --git a/kafka_consumer/tests/common.py b/kafka_consumer/tests/common.py
index 0d31b6d943137..a037094aab715 100644
--- a/kafka_consumer/tests/common.py
+++ b/kafka_consumer/tests/common.py
@@ -4,7 +4,6 @@
 import os
 import socket
 
-from datadog_checks.base import is_affirmative
 from datadog_checks.dev import get_docker_hostname
 
 HERE = os.path.dirname(os.path.abspath(__file__))
@@ -14,28 +13,5 @@
 ZK_CONNECT_STR = '{}:2181'.format(HOST)
 TOPICS = ['marvel', 'dc']
 PARTITIONS = [0, 1]
-USE_MULTIPLE_BROKERS = is_affirmative(os.environ['USE_MULTIPLE_BROKERS'])
-if USE_MULTIPLE_BROKERS:
-    DOCKER_IMAGE_PATH = os.path.join(HERE, 'docker', 'multiple-brokers.yaml')
-else:
-    DOCKER_IMAGE_PATH = os.path.join(HERE, 'docker', 'single-broker.yaml')
+DOCKER_IMAGE_PATH = os.path.join(HERE, 'docker', 'multiple-brokers.yaml')
 KAFKA_VERSION = os.environ.get('KAFKA_VERSION')
-
-
-def is_supported(flavor):
-    """
-    Returns whether the current CI configuration is supported
-    """
-    if not KAFKA_VERSION:
-        return False
-
-    if flavor != os.environ.get('KAFKA_OFFSETS_STORAGE'):
-        return False
-
-    return True
-
-
-def is_legacy_check():
-    return os.environ.get('KAFKA_OFFSETS_STORAGE') == 'zookeeper' or os.environ.get('KAFKA_VERSION', '').startswith(
-        '0.9'
-    )
diff --git a/kafka_consumer/tests/conftest.py b/kafka_consumer/tests/conftest.py
index 894799a6697ec..d5dfe9a181d0b 100644
--- a/kafka_consumer/tests/conftest.py
+++ b/kafka_consumer/tests/conftest.py
@@ -11,8 +11,8 @@
 
 from datadog_checks.dev import WaitFor, docker_run
 
-from .common import DOCKER_IMAGE_PATH, HOST_IP, KAFKA_CONNECT_STR, KAFKA_VERSION, PARTITIONS, TOPICS, ZK_CONNECT_STR
-from .runners import KConsumer, Producer, ZKConsumer
+from .common import DOCKER_IMAGE_PATH, HOST_IP, KAFKA_CONNECT_STR, KAFKA_VERSION, TOPICS
+from .runners import KConsumer, Producer
 
 
 def find_topics():
@@ -24,11 +24,7 @@ def find_topics():
 
 
 def initialize_topics():
-    flavor = os.environ.get('KAFKA_OFFSETS_STORAGE')
-    if flavor == 'zookeeper':
-        consumer = ZKConsumer(TOPICS, PARTITIONS)
-    else:
-        consumer = KConsumer(TOPICS)
+    consumer = KConsumer(TOPICS)
 
     with Producer():
         with consumer:
@@ -73,15 +69,6 @@ def dd_environment(mock_local_kafka_hosts_dns, e2e_instance):
 }
 
 
-@pytest.fixture(scope='session')
-def zk_instance():
-    return {
-        'kafka_connect_str': KAFKA_CONNECT_STR,
-        'zk_connect_str': ZK_CONNECT_STR,
-        'consumer_groups': {'my_consumer': {'marvel': [0]}},
-    }
-
-
 @pytest.fixture(scope='session')
 def kafka_instance():
     return {
@@ -116,12 +103,8 @@ def kafka_instance_tls():
 
 
 @pytest.fixture(scope='session')
-def e2e_instance(kafka_instance, zk_instance):
-    flavor = os.environ.get('KAFKA_OFFSETS_STORAGE')
-    if flavor == 'kafka':
-        return kafka_instance
-    elif flavor == 'zookeeper':
-        return zk_instance
+def e2e_instance(kafka_instance):
+    return kafka_instance
 
 
 def _get_bootstrap_server_flag():
diff --git a/kafka_consumer/tests/docker/single-broker.yaml b/kafka_consumer/tests/docker/single-broker.yaml
deleted file mode 100644
index 4de86331a7d38..0000000000000
--- a/kafka_consumer/tests/docker/single-broker.yaml
+++ /dev/null
@@ -1,22 +0,0 @@
-version: '3.5'
-
-services:
-  zookeeper:
-    image: zookeeper:${ZK_VERSION}
-    ports:
-      - 2181:2181
-
-  kafka:
-    image: "wurstmeister/kafka:${KAFKA_VERSION}"
-    ports:
-      - 9092:9092
-    environment:
-      KAFKA_BROKER_ID: 1
-      KAFKA_HOST: ${KAFKA_HOST}
-      KAFKA_ADVERTISED_HOST_NAME: ${KAFKA_HOST}
-      KAFKA_ADVERTISED_PORT: "9092"
-      KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
-      KAFKA_CREATE_TOPICS: "marvel:2:1,dc:2:1"
-      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
-    depends_on:
-      - zookeeper
diff --git a/kafka_consumer/tests/test_kafka_consumer.py b/kafka_consumer/tests/test_kafka_consumer.py
index 6b3db6827ffee..b667f40b8c8a3 100644
--- a/kafka_consumer/tests/test_kafka_consumer.py
+++ b/kafka_consumer/tests/test_kafka_consumer.py
@@ -11,15 +11,8 @@
 
 from datadog_checks.kafka_consumer import KafkaCheck
 from datadog_checks.kafka_consumer.kafka_consumer import OAuthTokenProvider
-from datadog_checks.kafka_consumer.legacy_0_10_2 import LegacyKafkaCheck_0_10_2
-from datadog_checks.kafka_consumer.new_kafka_consumer import NewKafkaConsumerCheck
-
-from .common import KAFKA_CONNECT_STR, is_legacy_check, is_supported
-
-pytestmark = pytest.mark.skipif(
-    not is_supported('kafka'), reason='kafka consumer offsets not supported in current environment'
-)
 
+from .common import KAFKA_CONNECT_STR
 
 BROKER_METRICS = ['kafka.broker_offset']
 
@@ -37,33 +30,11 @@ def mocked_time():
     return 400
 
 
-@pytest.mark.unit
-def test_uses_legacy_implementation_when_legacy_version_specified(kafka_instance):
-    instance = copy.deepcopy(kafka_instance)
-    instance['kafka_client_api_version'] = '0.10.1'
-    kafka_consumer_check = KafkaCheck('kafka_consumer', {}, [instance])
-    kafka_consumer_check._init_check_based_on_kafka_version()
-
-    assert isinstance(kafka_consumer_check.sub_check, LegacyKafkaCheck_0_10_2)
-
-
-@pytest.mark.unit
-def test_uses_new_implementation_when_new_version_specified(kafka_instance):
-    instance = copy.deepcopy(kafka_instance)
-    instance['kafka_client_api_version'] = '0.10.2'
-    kafka_consumer_check = KafkaCheck('kafka_consumer', {}, [instance])
-    kafka_consumer_check._init_check_based_on_kafka_version()
-
-    assert isinstance(kafka_consumer_check.sub_check, NewKafkaConsumerCheck)
-
-
 @pytest.mark.unit
 def test_get_interpolated_timestamp(kafka_instance):
     instance = copy.deepcopy(kafka_instance)
-    instance['kafka_client_api_version'] = '0.10.2'
     instance['sasl_kerberos_service_name'] = 'kafka'
     check = KafkaCheck('kafka_consumer', {}, [instance])
-    check._init_check_based_on_kafka_version()
     # at offset 0, time is 100s, at offset 10, time is 200sec.
     # by interpolation, at offset 5, time should be 150sec.
     assert check.sub_check._get_interpolated_timestamp({0: 100, 10: 200}, 5) == 150
@@ -75,7 +46,6 @@ def test_get_interpolated_timestamp(kafka_instance):
 @pytest.mark.unit
 def test_gssapi(kafka_instance, dd_run_check):
     instance = copy.deepcopy(kafka_instance)
-    instance['kafka_client_api_version'] = '0.10.2'
     instance['sasl_mechanism'] = 'GSSAPI'
     instance['security_protocol'] = 'SASL_PLAINTEXT'
     instance['sasl_kerberos_service_name'] = 'kafka'
@@ -218,7 +188,7 @@ def assert_check_kafka(aggregator, consumer_groups, data_streams_enabled=False):
                     aggregator.assert_metric(mname, tags=tags, at_least=1)
                 for mname in CONSUMER_METRICS:
                     aggregator.assert_metric(mname, tags=tags + ["consumer_group:{}".format(name)], at_least=1)
-    if not is_legacy_check() and data_streams_enabled:
+    if data_streams_enabled:
         # in the e2e test, Kafka is not actively receiving data. So we never populate the broker
         # timestamps with more than one timestamp. So we can't interpolate to get the consumer
         # timestamp.
@@ -229,7 +199,6 @@ def assert_check_kafka(aggregator, consumer_groups, data_streams_enabled=False):
     aggregator.assert_all_metrics_covered()
 
 
-@pytest.mark.skipif(is_legacy_check(), reason="This test does not apply to the legacy check.")
 @pytest.mark.integration
 @pytest.mark.usefixtures('dd_environment')
 def test_consumer_config_error(caplog, dd_run_check):
@@ -240,7 +209,6 @@ def test_consumer_config_error(caplog, dd_run_check):
     assert 'monitor_unlisted_consumer_groups is False' in caplog.text
 
 
-@pytest.mark.skipif(is_legacy_check(), reason="This test does not apply to the legacy check.")
 @pytest.mark.integration
 @pytest.mark.usefixtures('dd_environment')
 def test_no_topics(aggregator, kafka_instance, dd_run_check):
diff --git a/kafka_consumer/tests/test_kafka_consumer_zk.py b/kafka_consumer/tests/test_kafka_consumer_zk.py
deleted file mode 100644
index 621c1fc96f755..0000000000000
--- a/kafka_consumer/tests/test_kafka_consumer_zk.py
+++ /dev/null
@@ -1,125 +0,0 @@
-# (C) Datadog, Inc. 2018-present
-# All rights reserved
-# Licensed under Simplified BSD License (see LICENSE)
-import copy
-
-import pytest
-
-from datadog_checks.kafka_consumer import KafkaCheck
-
-from .common import HOST, PARTITIONS, TOPICS, is_supported
-
-pytestmark = pytest.mark.skipif(
-    not is_supported('zookeeper'), reason='zookeeper consumer offsets not supported in current environment'
-)
-
-
-BROKER_METRICS = ['kafka.broker_offset']
-
-CONSUMER_METRICS = ['kafka.consumer_offset', 'kafka.consumer_lag']
-
-
-@pytest.mark.usefixtures('dd_environment')
-def test_check_zk_basic_case_integration(aggregator, zk_instance, dd_run_check):
-    """
-    Testing Kafka_consumer check.
-    """
-    kafka_consumer_check = KafkaCheck('kafka_consumer', {}, [zk_instance])
-    dd_run_check(kafka_consumer_check)
-
-    _assert_check_zk_basic_case(aggregator, zk_instance)
-
-
-@pytest.mark.e2e
-def test_check_zk_basic_case_e2e(dd_agent_check, zk_instance):
-    aggregator = dd_agent_check(zk_instance)
-
-    _assert_check_zk_basic_case(aggregator, zk_instance)
-
-
-def _assert_check_zk_basic_case(aggregator, zk_instance):
-    for name, consumer_group in zk_instance['consumer_groups'].items():
-        for topic, partitions in consumer_group.items():
-            for partition in partitions:
-                tags = ["topic:{}".format(topic), "partition:{}".format(partition)]
-                for mname in BROKER_METRICS:
-                    aggregator.assert_metric(mname, tags=tags, at_least=1)
-                for mname in CONSUMER_METRICS:
-                    aggregator.assert_metric(
-                        mname, tags=tags + ["source:zk", "consumer_group:{}".format(name)], at_least=1
-                    )
-
-    aggregator.assert_all_metrics_covered()
-
-
-@pytest.mark.usefixtures('dd_environment')
-def test_multiple_servers_zk(aggregator, zk_instance, dd_run_check):
-    """
-    Testing Kafka_consumer check.
-    """
-    multiple_server_zk_instance = copy.deepcopy(zk_instance)
-    multiple_server_zk_instance['kafka_connect_str'] = [
-        multiple_server_zk_instance['kafka_connect_str'],
-        '{}:9092'.format(HOST),
-    ]
-
-    kafka_consumer_check = KafkaCheck('kafka_consumer', {}, [multiple_server_zk_instance])
-    dd_run_check(kafka_consumer_check)
-
-    for name, consumer_group in multiple_server_zk_instance['consumer_groups'].items():
-        for topic, partitions in consumer_group.items():
-            for partition in partitions:
-                tags = ["topic:{}".format(topic), "partition:{}".format(partition)]
-                for mname in BROKER_METRICS:
-                    aggregator.assert_metric(mname, tags=tags, at_least=1)
-                for mname in CONSUMER_METRICS:
-                    aggregator.assert_metric(
-                        mname, tags=tags + ["source:zk", "consumer_group:{}".format(name)], at_least=1
-                    )
-
-    aggregator.assert_all_metrics_covered()
-
-
-@pytest.mark.usefixtures('dd_environment')
-def test_check_no_groups_zk(aggregator, zk_instance, dd_run_check):
-    """
-    Testing Kafka_consumer check grabbing groups from ZK
-    """
-    nogroup_instance = copy.deepcopy(zk_instance)
-    nogroup_instance.pop('consumer_groups')
-    nogroup_instance['monitor_unlisted_consumer_groups'] = True
-
-    kafka_consumer_check = KafkaCheck('kafka_consumer', {}, [nogroup_instance])
-    dd_run_check(kafka_consumer_check)
-
-    for topic in TOPICS:
-        for partition in PARTITIONS:
-            tags = ["topic:{}".format(topic), "partition:{}".format(partition)]
-            for mname in BROKER_METRICS:
-                aggregator.assert_metric(mname, tags=tags, at_least=1)
-            for mname in CONSUMER_METRICS:
-                aggregator.assert_metric(mname, tags=tags + ['source:zk', 'consumer_group:my_consumer'], at_least=1)
-
-    aggregator.assert_all_metrics_covered()
-
-
-@pytest.mark.usefixtures('dd_environment')
-def test_check_no_partitions_zk(aggregator, zk_instance, dd_run_check):
-    """
-    Testing Kafka_consumer check grabbing partitions from ZK
-    """
-    no_partitions_instance = copy.deepcopy(zk_instance)
-    topic = 'marvel'
-    no_partitions_instance['consumer_groups'] = {'my_consumer': {topic: []}}
-
-    kafka_consumer_check = KafkaCheck('kafka_consumer', {}, [no_partitions_instance])
-    dd_run_check(kafka_consumer_check)
-
-    for partition in PARTITIONS:
-        tags = ["topic:{}".format(topic), "partition:{}".format(partition)]
-        for mname in BROKER_METRICS:
-            aggregator.assert_metric(mname, tags=tags, at_least=1)
-        for mname in CONSUMER_METRICS:
-            aggregator.assert_metric(mname, tags=tags + ['source:zk', 'consumer_group:my_consumer'], at_least=1)
-
-    aggregator.assert_all_metrics_covered()