Skip to content

Commit

Permalink
kafka_consumer: go back to python 3.8
Browse files Browse the repository at this point in the history
  • Loading branch information
piochelepiotr committed May 3, 2022
1 parent 0bf6c28 commit b00a110
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
# Licensed under Simplified BSD License (see LICENSE)
from collections import defaultdict
from time import time
import os

from kafka import errors as kafka_errors
from kafka.protocol.offset import OffsetRequest, OffsetResetStrategy, OffsetResponse
Expand Down Expand Up @@ -280,9 +279,10 @@ def _report_consumer_offsets_and_lag(self, contexts_limit):
if reported_contexts >= contexts_limit:
continue
timestamps = self._broker_timestamps[(topic, partition)]
# producer_timestamp is set in the same check, so it should never be None
producer_timestamp = timestamps[producer_offset]
consumer_timestamp = self._get_interpolated_timestamp(timestamps, consumer_offset)
if consumer_timestamp is None:
if consumer_timestamp is None or producer_timestamp is None:
continue
lag = consumer_timestamp - producer_timestamp
self.gauge('consumer_lag_seconds', lag, tags=consumer_group_tags)
Expand All @@ -303,7 +303,7 @@ def _report_consumer_offsets_and_lag(self, contexts_limit):

def _get_interpolated_timestamp(self, timestamps, offset):
if offset in timestamps:
return timestamps[offset], True
return timestamps[offset]
offsets = timestamps.keys()
try:
# Get the most close saved offsets to the consumer_offset
Expand Down
8 changes: 4 additions & 4 deletions kafka_consumer/tox.ini
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
[tox]
isolated_build = true
minversion = 2.0
basepython = py39
basepython = py38
envlist =
py27-{0.9,latest}-{kafka,zk}
py39-{0.9,0.11,1.1,2.3,latest}-{kafka,zk}
py38-{0.9,0.11,1.1,2.3,latest}-{kafka,zk}

[testenv]
ensure_default_envdir = true
envdir =
py27: {toxworkdir}/py27
py39: {toxworkdir}/py39
py38: {toxworkdir}/py38
description =
py{27,39}-{0.9,latest}-{kafka,zk}: e2e ready
py{27,38}-{0.9,latest}-{kafka,zk}: e2e ready
dd_check_style = true
usedevelop = true
platform = linux|darwin|win32
Expand Down

0 comments on commit b00a110

Please sign in to comment.