Skip to content

Commit

Permalink
reset client offsets
Browse files Browse the repository at this point in the history
  • Loading branch information
fanny-jiang committed Feb 15, 2023
1 parent 560927b commit 344800b
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -415,3 +415,7 @@ def get_partitions_for_topic(self, topic):

def request_metadata_update(self):
self.kafka_client._client.cluster.request_update()

def reset_offsets(self):
self._consumer_offsets = {}
self._highwater_offsets = {}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ def __init__(self, name, init_config, instances):
def check(self, _):
"""The main entrypoint of the check."""
# Fetch Kafka consumer offsets
self.client.reset_offsets()

try:
self.client.get_consumer_offsets()
except Exception:
Expand Down Expand Up @@ -158,7 +160,7 @@ def collect_broker_metadata(self):
version_data = [str(part) for part in self.client.collect_broker_version()]
version_parts = {name: part for name, part in zip(('major', 'minor', 'patch'), version_data)}

self.check.set_metadata(
self.set_metadata(
'version', '.'.join(version_data), scheme='parts', final_scheme='semver', part_map=version_parts
)

Expand Down

0 comments on commit 344800b

Please sign in to comment.