Skip to content

Commit

Permalink
Add Apache Kafka basic support (#105)
Browse files Browse the repository at this point in the history
  • Loading branch information
fdr400 authored Sep 24, 2024
1 parent 0ee64c4 commit 765171e
Show file tree
Hide file tree
Showing 18 changed files with 992 additions and 1 deletion.
10 changes: 10 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,16 @@ jobs:
sudo apt-get update
sudo apt-get install -y redis-server clickhouse-common-static=22.3.2.2
- uses: actions/checkout@v2
- name: Install Kafka
run: |
sudo apt-get install -y default-jre
curl https://dlcdn.apache.org/kafka/3.8.0/kafka_2.13-3.8.0.tgz -o kafka.tgz
sudo mkdir -p /etc/kafka
sudo tar xf kafka.tgz --directory=/etc/kafka
cd /etc/kafka
sudo cp -r kafka_2.13-3.8.0/* ./
sudo rm -rf kafka_2.13-3.8.0
- name: Install RabbitMQ
run: |
.github/install-rabbitmq.sh
Expand Down
3 changes: 3 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ Installation using pip_::
# testsuite with rabbitmq support
pip3 install yandex-taxi-testsuite[rabbitmq]

# testsuite with kafka support
pip3 install yandex-taxi-testsuite[kafka]

You can also include testsuite into your project as submodule, e.g.::

mkdir -p submodules
Expand Down
1 change: 1 addition & 0 deletions docs/databases.rst
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,4 @@ Data fixtures are loaded via staatic files, see :ref:`staticfiles`.
redis.rst
clickhouse.rst
rabbitmq.rst
kafka.rst
161 changes: 161 additions & 0 deletions docs/kafka.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
Kafka
========

In order to enable Kafka support you have to add
``testsuite.kafka.pytest_plugin`` to ``pytest_plugins`` list in your
``conftest.py``.

By default testsuite starts Kafka_ service. In this case Kafka installation
is required.

Currently Kafka plugin uses async aiokafka_ driver.

Kafka installation
---------------------

Consult official docs at https://kafka.apache.org/quickstart

If you already have Kafka installed and its location differs from
``/etc/kafka`` please specify
``KAFKA_HOME`` environment variable accordingly.

Installed Kafka **must** support KRaft_ protocol.

Environment variables
---------------------

KAFKA_HOME
~~~~~~~~~~

Use to override Kafka binaries dir. Default is ``/etc/kafka``

TESTSUITE_KAFKA_SERVER_HOST
~~~~~~~~~~~~~~~~~~~~~~~~~~~

Use to override Kafka server host. Default is ``localhost``.

TESTSUITE_KAFKA_SERVER_PORT
~~~~~~~~~~~~~~~~~~~~~~~~~~~

Use to override Kafka server port. Default is ``9099``.

TESTSUITE_KAFKA_CONTROLLER_PORT
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Use to override Kafka controller port. Default is ``9100``.

TESTSUITE_KAFKA_SERVER_START_TIMEOUT
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

By default testsuite will wait for up to 10s for Kafka to start,
one may customize this timeout via environment variable ``TESTSUITE_KAFKA_SERVER_START_TIMEOUT``.

TESTSUITE_KAFKA_CUSTOM_TOPICS
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

All topics in tests are created automatically by Kafka broker in runtime with **only 1 partition**.
To create topics with several partitions, either specify ``TESTSUITE_KAFKA_CUSTOM_TOPICS`` environment
variable with the ``,`` separated list of topic to partitions count mapping or override the ``kafka_custom_topics`` fixture.
For example, ``TESTSUITE_KAFKA_CUSTOM_TOPICS=large-topic-1:7,large-topic-2:20``
creates topic ``large-topic-1`` with 7 partitions and ``large-topic-2`` with 20 partitions.

Customize ports
---------------

Testsuite may start Kafka with custom ports if
``TESTSUITE_KAFKA_SERVER_PORT`` or ``TESTSUITE_KAFKA_CONTROLLER_PORT``
environment variables are specified.

Use external instance
---------------------

If your instance is local you may try setting environment variable
``TESTSUITE_KAFKA_SERVER_PORT`` and pytest option ``--kafka=1``
and see if it works.

P.S. Topics creation remains on the user's side.

Usage example
-------------

.. code-block:: python
async def test_kafka_producer_consumer_chain(kafka_producer, kafka_consumer):
TOPIC = 'Test-topic-chain'
KEY = 'test-key'
MESSAGE = 'test-message'
await kafka_producer.send(TOPIC, KEY, MESSAGE)
consumed_message = await kafka_consumer.receive_one([TOPIC])
assert consumed_message.topic == TOPIC
assert consumed_message.key == KEY
assert consumed_message.value == MESSAGE
.. _Kafka: https://kafka.apache.org/
.. _aiokafka: https://github.com/aio-libs/aiokafka
.. _KRaft: https://developer.confluent.io/learn/kraft/

Example integration
-------------------

.. code-block:: python
pytest_plugins = [
'testsuite.pytest_plugin',
'testsuite.databases.kafka.pytest_plugin',
]
KAFKA_CUSTOM_TOPICS = {
'Large-topic-1': 7,
'Large-topic-2': 3,
}
@pytest.fixture(scope='session')
def kafka_custom_topics():
return KAFKA_CUSTOM_TOPICS
Fixtures
--------

.. currentmodule:: testsuite.databases.kafka.pytest_plugin

kafka_producer
~~~~~~~~~~~~~~

.. autofunction:: kafka_producer()
:noindex:

kafka_consumer
~~~~~~~~~~~~~~

.. autofunction:: kafka_consumer()
:noindex:

kafka_custom_topics
~~~~~~~~~~~~~~~~~~~

.. autofunction:: kafka_custom_topics()
:noindex:

kafka_local
~~~~~~~~~~~~~~~~~~~

.. autofunction:: kafka_local()
:noindex:


Classes
-------

.. currentmodule:: testsuite.databases.kafka.classes

.. autoclass:: KafkaProducer()
:members: send, send_async

.. autoclass:: KafkaConsumer()
:members: receive_one, receive_batch

.. autoclass:: ConsumedMessage()
:members: topic, key, value, partition, offset
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
.[mongodb,postgresql,redis,mysql,clickhouse,rabbitmq]
.[mongodb,postgresql,redis,mysql,clickhouse,rabbitmq,kafka]

# Development requirements
flake8
Expand Down
5 changes: 5 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
'aio-pika==7.2.0; python_version < "3.7"',
'aio-pika>=8.1.0; python_version >= "3.7"',
],
'kafka': ['aiokafka==0.11.0'],
},
setup_requires=['pytest-runner'],
tests_require=['pytest'],
Expand Down Expand Up @@ -58,5 +59,9 @@
'scripts/service-rabbitmq',
'scripts/find-rabbitmq.sh',
],
'testsuite.databases.kafka': [
'scripts/service-kafka',
'scripts/find-kafka.sh',
],
},
)
2 changes: 2 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
'testsuite.databases.clickhouse.pytest_plugin',
# RabbitMQ
'testsuite.databases.rabbitmq.pytest_plugin',
# Kafka
'testsuite.databases.kafka.pytest_plugin',
]


Expand Down
Empty file.
8 changes: 8 additions & 0 deletions tests/databases/kafka/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import typing

import pytest


@pytest.fixture(scope='session')
def kafka_custom_topics() -> typing.Dict[str, int]:
return {'Large-Topic': 7}
40 changes: 40 additions & 0 deletions tests/databases/kafka/test_producer_basic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import asyncio


async def test_kafka_producer_basic(kafka_producer):
await kafka_producer.send('Test-topic', 'test-key', 'test-message')


async def test_kafka_producer_many_sends(kafka_producer):
TOPIC = 'Test-topic'
SEND_COUNT = 10
for send in range(SEND_COUNT):
await kafka_producer.send(
TOPIC, f'test-key-{send}', f'test-message-{send}'
)


async def test_kafka_producer_many_send_async(kafka_producer):
TOPIC = 'Test-topic'
SEND_COUNT = 100
send_futures = []
for send in range(SEND_COUNT):
send_futures.append(
await kafka_producer.send_async(
TOPIC, f'test-key-{send}', f'test-message-{send}'
)
)
await asyncio.wait(send_futures)


async def test_kafka_producer_large_topic(kafka_producer):
TOPIC = 'Large-Topic'
PARTITION_COUNT = 7

for partition in range(PARTITION_COUNT):
await kafka_producer.send(
TOPIC,
f'key-to-{partition}',
f'message-to-{partition}',
partition=partition,
)
66 changes: 66 additions & 0 deletions tests/databases/kafka/test_producer_consumer_basic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import logging
import typing


async def test_kafka_producer_consumer_chain(kafka_producer, kafka_consumer):
TOPIC = 'Test-topic-chain'
KEY = 'test-key'
MESSAGE = 'test-message'

await kafka_producer.send(TOPIC, KEY, MESSAGE)

consumed_message = await kafka_consumer.receive_one([TOPIC])

assert consumed_message.topic == TOPIC
assert consumed_message.key == KEY
assert consumed_message.value == MESSAGE


async def test_kafka_producer_consumer_chain_many_messages(
kafka_producer, kafka_consumer
):
TOPIC = 'Test-topic-chain'
SEND_COUNT = 10
BATCH_SIZE = 5

for send in range(SEND_COUNT):
await kafka_producer.send(
TOPIC, f'test-key-{send}', f'test-message-{send}'
)

sends_received: typing.Set[int] = set()

while len(sends_received) < SEND_COUNT:
consumed_messages = await kafka_consumer.receive_batch(
topics=[TOPIC], max_batch_size=BATCH_SIZE
)
logging.info('Received batch of %d messages', len(consumed_messages))
for message in consumed_messages:
sends_received.add(int(message.key.split('-')[-1]))


async def test_kafka_producer_consumer_chain_many_topics(
kafka_producer, kafka_consumer
):
TOPIC_COUNT = 3
TOPICS = [f'Test-topic-chain-{i}' for i in range(TOPIC_COUNT)]
SEND_PER_TOPIC_COUNT = 10
MESSAGE_COUNT = TOPIC_COUNT * SEND_PER_TOPIC_COUNT
BATCH_SIZE = 5

for i, topic in enumerate(TOPICS):
for send in range(SEND_PER_TOPIC_COUNT):
send_number = i * SEND_PER_TOPIC_COUNT + send
await kafka_producer.send(
topic, f'test-key-{send_number}', f'test-message-{send_number}'
)

sends_received: typing.Set[int] = set()

while len(sends_received) < MESSAGE_COUNT:
consumed_messages = await kafka_consumer.receive_batch(
topics=TOPICS, max_batch_size=BATCH_SIZE
)
logging.info('Received batch of %d messages', len(consumed_messages))
for message in consumed_messages:
sends_received.add(int(message.key.split('-')[-1]))
Empty file.
Loading

0 comments on commit 765171e

Please sign in to comment.