From 20f152bfb3d3ba0b5ac24b8d34ffde3cebccbfff Mon Sep 17 00:00:00 2001 From: Jeff Widman Date: Wed, 21 Aug 2019 17:54:16 -0700 Subject: [PATCH 1/2] Convert remaining `KafkaConsumer` tests to `pytest` This makes it so the only remaining use of `unittest` is in the old tests of the deprecated `Simple*` clients. All `KafkaConsumer` tests are migrated to `pytest`. --- test/conftest.py | 26 ++ test/test_consumer_group.py | 2 + test/test_consumer_integration.py | 501 +++++++++++++++--------------- test/testutil.py | 11 + 4 files changed, 284 insertions(+), 256 deletions(-) diff --git a/test/conftest.py b/test/conftest.py index 5015cc7a1..267ac6aa9 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -1,5 +1,7 @@ from __future__ import absolute_import +import uuid + import pytest from test.testutil import env_kafka_version, random_string @@ -137,3 +139,27 @@ def _set_conn_state(state): conn.connected = lambda: conn.state is ConnectionStates.CONNECTED conn.disconnected = lambda: conn.state is ConnectionStates.DISCONNECTED return conn + + +@pytest.fixture() +def send_messages(topic, kafka_producer, request): + """A factory that returns a send_messages function with a pre-populated + topic topic / producer.""" + + def _send_messages(number_range, partition=0, topic=topic, producer=kafka_producer, request=request): + """ + messages is typically `range(0,100)` + partition is an int + """ + messages_and_futures = [] # [(message, produce_future),] + for i in number_range: + # request.node.name provides the test name (including parametrized values) + encoded_msg = '{}-{}-{}'.format(i, request.node.name, uuid.uuid4()).encode('utf-8') + future = kafka_producer.send(topic, value=encoded_msg, partition=partition) + messages_and_futures.append((encoded_msg, future)) + kafka_producer.flush() + for (msg, f) in messages_and_futures: + assert f.succeeded() + return [msg for (msg, f) in messages_and_futures] + + return _send_messages diff --git a/test/test_consumer_group.py b/test/test_consumer_group.py index 33676179d..58dc7ebf9 100644 --- a/test/test_consumer_group.py +++ b/test/test_consumer_group.py @@ -29,6 +29,7 @@ def test_consumer(kafka_broker, topic): assert consumer._client._conns[node_id].state is ConnectionStates.CONNECTED consumer.close() + @pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set") def test_consumer_topics(kafka_broker, topic): consumer = KafkaConsumer(bootstrap_servers=get_connect_str(kafka_broker)) @@ -38,6 +39,7 @@ def test_consumer_topics(kafka_broker, topic): assert len(consumer.partitions_for_topic(topic)) > 0 consumer.close() + @pytest.mark.skipif(env_kafka_version() < (0, 9), reason='Unsupported Kafka Version') def test_group(kafka_broker, topic): num_partitions = 4 diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py index cb0524294..1dc436c76 100644 --- a/test/test_consumer_integration.py +++ b/test/test_consumer_integration.py @@ -4,7 +4,6 @@ from mock import patch import pytest -from kafka.vendor import six from kafka.vendor.six.moves import range from . import unittest @@ -23,34 +22,26 @@ ) from test.fixtures import ZookeeperFixture, KafkaFixture -from test.testutil import KafkaIntegrationTestCase, Timer, env_kafka_version, random_string +from test.testutil import KafkaIntegrationTestCase, Timer, assert_message_count, env_kafka_version, random_string @pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set") -def test_kafka_consumer(kafka_producer, topic, kafka_consumer_factory): +def test_kafka_consumer(kafka_consumer_factory, send_messages): """Test KafkaConsumer""" - kafka_consumer = kafka_consumer_factory(auto_offset_reset='earliest') - - # TODO replace this with a `send_messages()` pytest fixture - # as we will likely need this elsewhere - for i in range(0, 100): - kafka_producer.send(topic, partition=0, value=str(i).encode()) - for i in range(100, 200): - kafka_producer.send(topic, partition=1, value=str(i).encode()) - kafka_producer.flush() - + consumer = kafka_consumer_factory(auto_offset_reset='earliest') + send_messages(range(0, 100), partition=0) + send_messages(range(0, 100), partition=1) cnt = 0 - messages = {0: set(), 1: set()} - for message in kafka_consumer: + messages = {0: [], 1: []} + for message in consumer: logging.debug("Consumed message %s", repr(message)) cnt += 1 - messages[message.partition].add(message.offset) + messages[message.partition].append(message) if cnt >= 200: break - assert len(messages[0]) == 100 - assert len(messages[1]) == 100 - kafka_consumer.close() + assert_message_count(messages[0], 100) + assert_message_count(messages[1], 100) @pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set") @@ -547,242 +538,240 @@ def test_fetch_buffer_size(self): messages = [ message for message in consumer ] self.assertEqual(len(messages), 2) - @pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set") - def test_kafka_consumer__blocking(self): - TIMEOUT_MS = 500 - consumer = self.kafka_consumer(auto_offset_reset='earliest', - enable_auto_commit=False, - consumer_timeout_ms=TIMEOUT_MS) - - # Manual assignment avoids overhead of consumer group mgmt - consumer.unsubscribe() - consumer.assign([TopicPartition(self.topic, 0)]) - # Ask for 5 messages, nothing in queue, block 500ms - with Timer() as t: - with self.assertRaises(StopIteration): - msg = next(consumer) - self.assertGreaterEqual(t.interval, TIMEOUT_MS / 1000.0 ) - - self.send_messages(0, range(0, 10)) - - # Ask for 5 messages, 10 in queue. Get 5 back, no blocking - messages = set() - with Timer() as t: - for i in range(5): +@pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set") +def test_kafka_consumer__blocking(kafka_consumer_factory, topic, send_messages): + TIMEOUT_MS = 500 + consumer = kafka_consumer_factory(auto_offset_reset='earliest', + enable_auto_commit=False, + consumer_timeout_ms=TIMEOUT_MS) + + # Manual assignment avoids overhead of consumer group mgmt + consumer.unsubscribe() + consumer.assign([TopicPartition(topic, 0)]) + + # Ask for 5 messages, nothing in queue, block 500ms + with Timer() as t: + with pytest.raises(StopIteration): + msg = next(consumer) + assert t.interval >= (TIMEOUT_MS / 1000.0) + + send_messages(range(0, 10)) + + # Ask for 5 messages, 10 in queue. Get 5 back, no blocking + messages = [] + with Timer() as t: + for i in range(5): + msg = next(consumer) + messages.append(msg) + assert_message_count(messages, 5) + assert t.interval < (TIMEOUT_MS / 1000.0) + + # Ask for 10 messages, get 5 back, block 500ms + messages = [] + with Timer() as t: + with pytest.raises(StopIteration): + for i in range(10): msg = next(consumer) - messages.add((msg.partition, msg.offset)) - self.assertEqual(len(messages), 5) - self.assertLess(t.interval, TIMEOUT_MS / 1000.0 ) - - # Ask for 10 messages, get 5 back, block 500ms - messages = set() - with Timer() as t: - with self.assertRaises(StopIteration): - for i in range(10): - msg = next(consumer) - messages.add((msg.partition, msg.offset)) - self.assertEqual(len(messages), 5) - self.assertGreaterEqual(t.interval, TIMEOUT_MS / 1000.0 ) - consumer.close() - - @pytest.mark.skipif(env_kafka_version() < (0, 8, 1), reason="Requires KAFKA_VERSION >= 0.8.1") - def test_kafka_consumer__offset_commit_resume(self): - GROUP_ID = random_string(10) - - self.send_messages(0, range(0, 100)) - self.send_messages(1, range(100, 200)) - - # Start a consumer - consumer1 = self.kafka_consumer( - group_id=GROUP_ID, - enable_auto_commit=True, - auto_commit_interval_ms=100, - auto_offset_reset='earliest', - ) - - # Grab the first 180 messages - output_msgs1 = [] - for _ in range(180): - m = next(consumer1) - output_msgs1.append((m.key, m.value)) - self.assert_message_count(output_msgs1, 180) - consumer1.close() - - # The total offset across both partitions should be at 180 - consumer2 = self.kafka_consumer( - group_id=GROUP_ID, - enable_auto_commit=True, - auto_commit_interval_ms=100, - auto_offset_reset='earliest', - ) - - # 181-200 - output_msgs2 = [] - for _ in range(20): - m = next(consumer2) - output_msgs2.append((m.key, m.value)) - self.assert_message_count(output_msgs2, 20) - self.assertEqual(len(set(output_msgs1) | set(output_msgs2)), 200) - consumer2.close() - - @pytest.mark.skipif(env_kafka_version() < (0, 10, 1), reason="Requires KAFKA_VERSION >= 0.10.1") - def test_kafka_consumer_max_bytes_simple(self): - self.send_messages(0, range(100, 200)) - self.send_messages(1, range(200, 300)) - - # Start a consumer - consumer = self.kafka_consumer( - auto_offset_reset='earliest', fetch_max_bytes=300) - seen_partitions = set([]) - for i in range(10): - poll_res = consumer.poll(timeout_ms=100) - for partition, msgs in six.iteritems(poll_res): - for msg in msgs: - seen_partitions.add(partition) - - # Check that we fetched at least 1 message from both partitions - self.assertEqual( - seen_partitions, set([ - TopicPartition(self.topic, 0), TopicPartition(self.topic, 1)])) - consumer.close() - - @pytest.mark.skipif(env_kafka_version() < (0, 10, 1), reason="Requires KAFKA_VERSION >= 0.10.1") - def test_kafka_consumer_max_bytes_one_msg(self): - # We send to only 1 partition so we don't have parallel requests to 2 - # nodes for data. - self.send_messages(0, range(100, 200)) - - # Start a consumer. FetchResponse_v3 should always include at least 1 - # full msg, so by setting fetch_max_bytes=1 we should get 1 msg at a time - # But 0.11.0.0 returns 1 MessageSet at a time when the messages are - # stored in the new v2 format by the broker. - # - # DP Note: This is a strange test. The consumer shouldn't care - # how many messages are included in a FetchResponse, as long as it is - # non-zero. I would not mind if we deleted this test. It caused - # a minor headache when testing 0.11.0.0. - group = 'test-kafka-consumer-max-bytes-one-msg-' + random_string(5) - consumer = self.kafka_consumer( - group_id=group, - auto_offset_reset='earliest', - consumer_timeout_ms=5000, - fetch_max_bytes=1) - - fetched_msgs = [next(consumer) for i in range(10)] - self.assertEqual(len(fetched_msgs), 10) - consumer.close() - - @pytest.mark.skipif(env_kafka_version() < (0, 10, 1), reason="Requires KAFKA_VERSION >= 0.10.1") - def test_kafka_consumer_offsets_for_time(self): - late_time = int(time.time()) * 1000 - middle_time = late_time - 1000 - early_time = late_time - 2000 - tp = TopicPartition(self.topic, 0) - - timeout = 10 - kafka_producer = self.kafka_producer() - early_msg = kafka_producer.send( - self.topic, partition=0, value=b"first", - timestamp_ms=early_time).get(timeout) - late_msg = kafka_producer.send( - self.topic, partition=0, value=b"last", - timestamp_ms=late_time).get(timeout) - - consumer = self.kafka_consumer() - offsets = consumer.offsets_for_times({tp: early_time}) - self.assertEqual(len(offsets), 1) - self.assertEqual(offsets[tp].offset, early_msg.offset) - self.assertEqual(offsets[tp].timestamp, early_time) - - offsets = consumer.offsets_for_times({tp: middle_time}) - self.assertEqual(offsets[tp].offset, late_msg.offset) - self.assertEqual(offsets[tp].timestamp, late_time) - - offsets = consumer.offsets_for_times({tp: late_time}) - self.assertEqual(offsets[tp].offset, late_msg.offset) - self.assertEqual(offsets[tp].timestamp, late_time) - - offsets = consumer.offsets_for_times({}) - self.assertEqual(offsets, {}) - - # Out of bound timestamps check - - offsets = consumer.offsets_for_times({tp: 0}) - self.assertEqual(offsets[tp].offset, early_msg.offset) - self.assertEqual(offsets[tp].timestamp, early_time) - - offsets = consumer.offsets_for_times({tp: 9999999999999}) - self.assertEqual(offsets[tp], None) - - # Beginning/End offsets - - offsets = consumer.beginning_offsets([tp]) - self.assertEqual(offsets, { - tp: early_msg.offset, - }) - offsets = consumer.end_offsets([tp]) - self.assertEqual(offsets, { - tp: late_msg.offset + 1 - }) - consumer.close() - - @pytest.mark.skipif(env_kafka_version() < (0, 10, 1), reason="Requires KAFKA_VERSION >= 0.10.1") - def test_kafka_consumer_offsets_search_many_partitions(self): - tp0 = TopicPartition(self.topic, 0) - tp1 = TopicPartition(self.topic, 1) - - kafka_producer = self.kafka_producer() - send_time = int(time.time() * 1000) - timeout = 10 - p0msg = kafka_producer.send( - self.topic, partition=0, value=b"XXX", - timestamp_ms=send_time).get(timeout) - p1msg = kafka_producer.send( - self.topic, partition=1, value=b"XXX", - timestamp_ms=send_time).get(timeout) - - consumer = self.kafka_consumer() - offsets = consumer.offsets_for_times({ - tp0: send_time, - tp1: send_time - }) - - self.assertEqual(offsets, { - tp0: OffsetAndTimestamp(p0msg.offset, send_time), - tp1: OffsetAndTimestamp(p1msg.offset, send_time) - }) - - offsets = consumer.beginning_offsets([tp0, tp1]) - self.assertEqual(offsets, { - tp0: p0msg.offset, - tp1: p1msg.offset - }) - - offsets = consumer.end_offsets([tp0, tp1]) - self.assertEqual(offsets, { - tp0: p0msg.offset + 1, - tp1: p1msg.offset + 1 - }) - consumer.close() - - @pytest.mark.skipif(env_kafka_version() >= (0, 10, 1), reason="Requires KAFKA_VERSION < 0.10.1") - def test_kafka_consumer_offsets_for_time_old(self): - consumer = self.kafka_consumer() - tp = TopicPartition(self.topic, 0) - - with self.assertRaises(UnsupportedVersionError): - consumer.offsets_for_times({tp: int(time.time())}) - - @pytest.mark.skipif(env_kafka_version() < (0, 10, 1), reason="Requires KAFKA_VERSION >= 0.10.1") - def test_kafka_consumer_offsets_for_times_errors(self): - consumer = self.kafka_consumer(fetch_max_wait_ms=200, - request_timeout_ms=500) - tp = TopicPartition(self.topic, 0) - bad_tp = TopicPartition(self.topic, 100) - - with self.assertRaises(ValueError): - consumer.offsets_for_times({tp: -1}) - - with self.assertRaises(KafkaTimeoutError): - consumer.offsets_for_times({bad_tp: 0}) + messages.append(msg) + assert_message_count(messages, 5) + assert t.interval >= (TIMEOUT_MS / 1000.0) + + +@pytest.mark.skipif(env_kafka_version() < (0, 8, 1), reason="Requires KAFKA_VERSION >= 0.8.1") +def test_kafka_consumer__offset_commit_resume(kafka_consumer_factory, send_messages): + GROUP_ID = random_string(10) + + send_messages(range(0, 100), partition=0) + send_messages(range(100, 200), partition=1) + + # Start a consumer and grab the first 180 messages + consumer1 = kafka_consumer_factory( + group_id=GROUP_ID, + enable_auto_commit=True, + auto_commit_interval_ms=100, + auto_offset_reset='earliest', + ) + output_msgs1 = [] + for _ in range(180): + m = next(consumer1) + output_msgs1.append(m) + assert_message_count(output_msgs1, 180) + + # Normally we let the pytest fixture `kafka_consumer_factory` handle + # closing as part of its teardown. Here we manually call close() to force + # auto-commit to occur before the second consumer starts. That way the + # second consumer only consumes previously unconsumed messages. + consumer1.close() + + # Start a second consumer to grab 181-200 + consumer2 = kafka_consumer_factory( + group_id=GROUP_ID, + enable_auto_commit=True, + auto_commit_interval_ms=100, + auto_offset_reset='earliest', + ) + output_msgs2 = [] + for _ in range(20): + m = next(consumer2) + output_msgs2.append(m) + assert_message_count(output_msgs2, 20) + + # Verify the second consumer wasn't reconsuming messages that the first + # consumer already saw + assert_message_count(output_msgs1 + output_msgs2, 200) + + +@pytest.mark.skipif(env_kafka_version() < (0, 10, 1), reason="Requires KAFKA_VERSION >= 0.10.1") +def test_kafka_consumer_max_bytes_simple(kafka_consumer_factory, topic, send_messages): + send_messages(range(100, 200), partition=0) + send_messages(range(200, 300), partition=1) + + # Start a consumer + consumer = kafka_consumer_factory( + auto_offset_reset='earliest', fetch_max_bytes=300) + seen_partitions = set() + for i in range(10): + poll_res = consumer.poll(timeout_ms=100) + for partition, msgs in poll_res.items(): + for msg in msgs: + seen_partitions.add(partition) + + # Check that we fetched at least 1 message from both partitions + assert seen_partitions == {TopicPartition(topic, 0), TopicPartition(topic, 1)} + + +@pytest.mark.skipif(env_kafka_version() < (0, 10, 1), reason="Requires KAFKA_VERSION >= 0.10.1") +def test_kafka_consumer_max_bytes_one_msg(kafka_consumer_factory, send_messages): + # We send to only 1 partition so we don't have parallel requests to 2 + # nodes for data. + send_messages(range(100, 200)) + + # Start a consumer. FetchResponse_v3 should always include at least 1 + # full msg, so by setting fetch_max_bytes=1 we should get 1 msg at a time + # But 0.11.0.0 returns 1 MessageSet at a time when the messages are + # stored in the new v2 format by the broker. + # + # DP Note: This is a strange test. The consumer shouldn't care + # how many messages are included in a FetchResponse, as long as it is + # non-zero. I would not mind if we deleted this test. It caused + # a minor headache when testing 0.11.0.0. + group = 'test-kafka-consumer-max-bytes-one-msg-' + random_string(5) + consumer = kafka_consumer_factory( + group_id=group, + auto_offset_reset='earliest', + consumer_timeout_ms=5000, + fetch_max_bytes=1) + + fetched_msgs = [next(consumer) for i in range(10)] + assert_message_count(fetched_msgs, 10) + + +@pytest.mark.skipif(env_kafka_version() < (0, 10, 1), reason="Requires KAFKA_VERSION >= 0.10.1") +def test_kafka_consumer_offsets_for_time(topic, kafka_consumer, kafka_producer): + late_time = int(time.time()) * 1000 + middle_time = late_time - 1000 + early_time = late_time - 2000 + tp = TopicPartition(topic, 0) + + timeout = 10 + early_msg = kafka_producer.send( + topic, partition=0, value=b"first", + timestamp_ms=early_time).get(timeout) + late_msg = kafka_producer.send( + topic, partition=0, value=b"last", + timestamp_ms=late_time).get(timeout) + + consumer = kafka_consumer + offsets = consumer.offsets_for_times({tp: early_time}) + assert len(offsets) == 1 + assert offsets[tp].offset == early_msg.offset + assert offsets[tp].timestamp == early_time + + offsets = consumer.offsets_for_times({tp: middle_time}) + assert offsets[tp].offset == late_msg.offset + assert offsets[tp].timestamp == late_time + + offsets = consumer.offsets_for_times({tp: late_time}) + assert offsets[tp].offset == late_msg.offset + assert offsets[tp].timestamp == late_time + + offsets = consumer.offsets_for_times({}) + assert offsets == {} + + # Out of bound timestamps check + + offsets = consumer.offsets_for_times({tp: 0}) + assert offsets[tp].offset == early_msg.offset + assert offsets[tp].timestamp == early_time + + offsets = consumer.offsets_for_times({tp: 9999999999999}) + assert offsets[tp] is None + + # Beginning/End offsets + + offsets = consumer.beginning_offsets([tp]) + assert offsets == {tp: early_msg.offset} + offsets = consumer.end_offsets([tp]) + assert offsets == {tp: late_msg.offset + 1} + + +@pytest.mark.skipif(env_kafka_version() < (0, 10, 1), reason="Requires KAFKA_VERSION >= 0.10.1") +def test_kafka_consumer_offsets_search_many_partitions(kafka_consumer, kafka_producer, topic): + tp0 = TopicPartition(topic, 0) + tp1 = TopicPartition(topic, 1) + + send_time = int(time.time() * 1000) + timeout = 10 + p0msg = kafka_producer.send( + topic, partition=0, value=b"XXX", + timestamp_ms=send_time).get(timeout) + p1msg = kafka_producer.send( + topic, partition=1, value=b"XXX", + timestamp_ms=send_time).get(timeout) + + consumer = kafka_consumer + offsets = consumer.offsets_for_times({ + tp0: send_time, + tp1: send_time + }) + + assert offsets == { + tp0: OffsetAndTimestamp(p0msg.offset, send_time), + tp1: OffsetAndTimestamp(p1msg.offset, send_time) + } + + offsets = consumer.beginning_offsets([tp0, tp1]) + assert offsets == { + tp0: p0msg.offset, + tp1: p1msg.offset + } + + offsets = consumer.end_offsets([tp0, tp1]) + assert offsets == { + tp0: p0msg.offset + 1, + tp1: p1msg.offset + 1 + } + + +@pytest.mark.skipif(env_kafka_version() >= (0, 10, 1), reason="Requires KAFKA_VERSION < 0.10.1") +def test_kafka_consumer_offsets_for_time_old(kafka_consumer, topic): + consumer = kafka_consumer + tp = TopicPartition(topic, 0) + + with pytest.raises(UnsupportedVersionError): + consumer.offsets_for_times({tp: int(time.time())}) + + +@pytest.mark.skipif(env_kafka_version() < (0, 10, 1), reason="Requires KAFKA_VERSION >= 0.10.1") +def test_kafka_consumer_offsets_for_times_errors(kafka_consumer_factory, topic): + consumer = kafka_consumer_factory(fetch_max_wait_ms=200, + request_timeout_ms=500) + tp = TopicPartition(topic, 0) + bad_tp = TopicPartition(topic, 100) + + with pytest.raises(ValueError): + consumer.offsets_for_times({tp: -1}) + + with pytest.raises(KafkaTimeoutError): + consumer.offsets_for_times({bad_tp: 0}) diff --git a/test/testutil.py b/test/testutil.py index 327226205..650f9bf29 100644 --- a/test/testutil.py +++ b/test/testutil.py @@ -47,6 +47,17 @@ def current_offset(client, topic, partition, kafka_broker=None): return offsets.offsets[0] +def assert_message_count(messages, num_messages): + """Check that we received the expected number of messages with no duplicates.""" + # Make sure we got them all + assert len(messages) == num_messages + # Make sure there are no duplicates + # Note: Currently duplicates are identified only using key/value. Other attributes like topic, partition, headers, + # timestamp, etc are ignored... this could be changed if necessary, but will be more tolerant of dupes. + unique_messages = {(m.key, m.value) for m in messages} + assert len(unique_messages) == num_messages + + class KafkaIntegrationTestCase(unittest.TestCase): create_client = True topic = None From f370d97b0d08771a36e0d3b128f33f555e9433fb Mon Sep 17 00:00:00 2001 From: Jeff Widman Date: Thu, 22 Aug 2019 17:43:52 -0700 Subject: [PATCH 2/2] Bump failing test iterations to 100--is travis happy? Locally I can Kafka 1.0.1 with python 3.7 to pass intermittently. On Travis it's not passing. So wondering if I just need more iterations. Hopefully we only need 20-50, but trying 90 here to see if that truly fixes it without actually going over the 100 messages we sent... ie, cn we fetch from both partitions or does it force us to do 100 messages from first partition before going onto second partition (which would be a problem)?? Note: not guaranteed that 90 cycles isn't getting us all the 100 messages in the first partition since `fetch_max_bytes=300`... but a shot in the dark here... if Travis still fails even with this, then may be something else is the problem. --- test/test_consumer_integration.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py index 1dc436c76..c7e2ebf5e 100644 --- a/test/test_consumer_integration.py +++ b/test/test_consumer_integration.py @@ -631,7 +631,7 @@ def test_kafka_consumer_max_bytes_simple(kafka_consumer_factory, topic, send_mes consumer = kafka_consumer_factory( auto_offset_reset='earliest', fetch_max_bytes=300) seen_partitions = set() - for i in range(10): + for i in range(90): poll_res = consumer.poll(timeout_ms=100) for partition, msgs in poll_res.items(): for msg in msgs: