Skip to content

Commit

Permalink
Merge two tests that are very similar
Browse files Browse the repository at this point in the history
Previously the `test_kafka_consumer_max_bytes_simple()` was seeing
occasional test failures because it was doing only 10 iterations. And
much of the purpose of it was gutted when Kafka 0.11 came out and
changed the behavior. So this merges the two tests into one which should
be relatively straightforward.

Further discussion in https://github.com/dpkp/kafka-python/pull/1886/files#r316860737
  • Loading branch information
jeffwidman committed Aug 23, 2019
1 parent 61fa0b2 commit 410f03b
Showing 1 changed file with 15 additions and 35 deletions.
50 changes: 15 additions & 35 deletions test/test_consumer_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -623,48 +623,28 @@ def test_kafka_consumer__offset_commit_resume(kafka_consumer_factory, send_messa


@pytest.mark.skipif(env_kafka_version() < (0, 10, 1), reason="Requires KAFKA_VERSION >= 0.10.1")
def test_kafka_consumer_max_bytes_simple(kafka_consumer_factory, topic, send_messages):
send_messages(range(100, 200), partition=0)
send_messages(range(200, 300), partition=1)

# Start a consumer
consumer = kafka_consumer_factory(
auto_offset_reset='earliest', fetch_max_bytes=300)
seen_partitions = set()
for i in range(90):
poll_res = consumer.poll(timeout_ms=100)
for partition, msgs in poll_res.items():
for msg in msgs:
seen_partitions.add(partition)
def test_kafka_consumer_max_bytes_one_msg(kafka_consumer_factory, topic, send_messages):
"""Check that messages larger than fetch_max_bytes are still received.
# Check that we fetched at least 1 message from both partitions
assert seen_partitions == {TopicPartition(topic, 0), TopicPartition(topic, 1)}


@pytest.mark.skipif(env_kafka_version() < (0, 10, 1), reason="Requires KAFKA_VERSION >= 0.10.1")
def test_kafka_consumer_max_bytes_one_msg(kafka_consumer_factory, send_messages):
# We send to only 1 partition so we don't have parallel requests to 2
# nodes for data.
send_messages(range(100, 200))
We are checking for both partition starvation and messages simply not being
received. The broker should reply with them, just making sure the consumer
isn't doing anything unexpected client-side that blocks them.
"""
send_messages(range(0, 100), partition=0)
send_messages(range(100, 200), partition=1)

# Start a consumer. FetchResponse_v3 should always include at least 1
# full msg, so by setting fetch_max_bytes=1 we should get 1 msg at a time
# But 0.11.0.0 returns 1 MessageSet at a time when the messages are
# stored in the new v2 format by the broker.
#
# DP Note: This is a strange test. The consumer shouldn't care
# how many messages are included in a FetchResponse, as long as it is
# non-zero. I would not mind if we deleted this test. It caused
# a minor headache when testing 0.11.0.0.
group = 'test-kafka-consumer-max-bytes-one-msg-' + random_string(5)
consumer = kafka_consumer_factory(
group_id=group,
auto_offset_reset='earliest',
consumer_timeout_ms=5000,
fetch_max_bytes=1)
consumer = kafka_consumer_factory(auto_offset_reset='earliest', fetch_max_bytes=1)

messages = [next(consumer) for i in range(25)]
assert_message_count(messages, 25)

fetched_msgs = [next(consumer) for i in range(10)]
assert_message_count(fetched_msgs, 10)
# Check that we fetched at least 1 message from both partitions
seen_partitions = {(m.topic, m.partition) for m in messages}
assert seen_partitions == {TopicPartition(topic, 0), TopicPartition(topic, 1)}


@pytest.mark.skipif(env_kafka_version() < (0, 10, 1), reason="Requires KAFKA_VERSION >= 0.10.1")
Expand Down

0 comments on commit 410f03b

Please sign in to comment.