Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Convert remaining KafkaConsumer tests to pytest #1886

Merged
merged 2 commits into from
Aug 23, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions test/conftest.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from __future__ import absolute_import

import uuid

import pytest

from test.testutil import env_kafka_version, random_string
Expand Down Expand Up @@ -137,3 +139,27 @@ def _set_conn_state(state):
conn.connected = lambda: conn.state is ConnectionStates.CONNECTED
conn.disconnected = lambda: conn.state is ConnectionStates.DISCONNECTED
return conn


@pytest.fixture()
def send_messages(topic, kafka_producer, request):
"""A factory that returns a send_messages function with a pre-populated
topic topic / producer."""

def _send_messages(number_range, partition=0, topic=topic, producer=kafka_producer, request=request):
"""
messages is typically `range(0,100)`
partition is an int
"""
messages_and_futures = [] # [(message, produce_future),]
for i in number_range:
# request.node.name provides the test name (including parametrized values)
encoded_msg = '{}-{}-{}'.format(i, request.node.name, uuid.uuid4()).encode('utf-8')
future = kafka_producer.send(topic, value=encoded_msg, partition=partition)
messages_and_futures.append((encoded_msg, future))
kafka_producer.flush()
for (msg, f) in messages_and_futures:
assert f.succeeded()
return [msg for (msg, f) in messages_and_futures]

return _send_messages
2 changes: 2 additions & 0 deletions test/test_consumer_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ def test_consumer(kafka_broker, topic):
assert consumer._client._conns[node_id].state is ConnectionStates.CONNECTED
consumer.close()


@pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set")
def test_consumer_topics(kafka_broker, topic):
consumer = KafkaConsumer(bootstrap_servers=get_connect_str(kafka_broker))
Expand All @@ -38,6 +39,7 @@ def test_consumer_topics(kafka_broker, topic):
assert len(consumer.partitions_for_topic(topic)) > 0
consumer.close()


@pytest.mark.skipif(env_kafka_version() < (0, 9), reason='Unsupported Kafka Version')
def test_group(kafka_broker, topic):
num_partitions = 4
Expand Down
Loading