diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 8b8429b..5dfd5d4 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -39,7 +39,7 @@ jobs: run: python -m build - name: Publish - uses: pypa/gh-action-pypi-publish@v1.8.14 + uses: pypa/gh-action-pypi-publish@v1.9.0 with: password: ${{ secrets.PYPI_TOKEN }} diff --git a/pyproject.toml b/pyproject.toml index 9e4c486..50c24b4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -42,7 +42,7 @@ classifiers = [ dynamic = ["version"] dependencies = [ - "taskiq>=0.10.0,<1.0.0", + "taskiq>=0.11.0,<0.12.0", "faststream>=0.3.14,<0.6.0", ] @@ -59,6 +59,10 @@ kafka = [ "faststream[kafka]" ] +confluent = [ + "faststream[confluent]" +] + redis = [ "faststream[redis]" ] @@ -68,6 +72,7 @@ test = [ "taskiq-faststream[nats]", "taskiq-faststream[rabbit]", "taskiq-faststream[kafka]", + "taskiq-faststream[confluent]", "taskiq-faststream[redis]", "coverage[toml]>=7.2.0,<8.0.0", @@ -77,7 +82,7 @@ test = [ dev = [ "taskiq-faststream[test]", - "mypy>=1.8.0,<1.10.0", + "mypy>=1.8.0,<1.12.0", "ruff==0.4.1", "pre-commit >=3.6.0,<4.0.0", ] diff --git a/taskiq_faststream/__about__.py b/taskiq_faststream/__about__.py index 67739d1..d9e60ec 100644 --- a/taskiq_faststream/__about__.py +++ b/taskiq_faststream/__about__.py @@ -1,2 +1,3 @@ """FastStream - taskiq integration to schedule FastStream tasks.""" -__version__ = "0.1.8" + +__version__ = "0.2.0" diff --git a/taskiq_faststream/broker.py b/taskiq_faststream/broker.py index f295316..35abc1e 100644 --- a/taskiq_faststream/broker.py +++ b/taskiq_faststream/broker.py @@ -5,12 +5,12 @@ import anyio from faststream.app import FastStream from faststream.types import SendableMessage -from taskiq import AsyncBroker, BrokerMessage +from taskiq import AsyncBroker from taskiq.acks import AckableMessage from taskiq.decor import AsyncTaskiqDecoratedTask from typing_extensions import TypeAlias, override -from taskiq_faststream.serializer import PatchedSerializer +from taskiq_faststream.formatter import PatchedFormatter, PathcedMessage from taskiq_faststream.types import ScheduledTask from taskiq_faststream.utils import resolve_msg @@ -33,7 +33,7 @@ class BrokerWrapper(AsyncBroker): def __init__(self, broker: Any) -> None: super().__init__() - self.serializer = PatchedSerializer() + self.formatter = PatchedFormatter() self.broker = broker async def startup(self) -> None: @@ -46,7 +46,7 @@ async def shutdown(self) -> None: await self.broker.close() await super().shutdown() - async def kick(self, message: BrokerMessage) -> None: + async def kick(self, message: PathcedMessage) -> None: # type: ignore[override] """Call wrapped FastStream broker `publish` method.""" await _broker_publish(self.broker, message) @@ -109,7 +109,7 @@ class AppWrapper(BrokerWrapper): def __init__(self, app: FastStream) -> None: super(BrokerWrapper, self).__init__() - self.serializer = PatchedSerializer() + self.formatter = PatchedFormatter() self.app = app async def startup(self) -> None: @@ -122,7 +122,7 @@ async def shutdown(self) -> None: await self.app._shutdown() # noqa: SLF001 await super(BrokerWrapper, self).shutdown() - async def kick(self, message: BrokerMessage) -> None: + async def kick(self, message: PathcedMessage) -> None: # type: ignore[override] """Call wrapped FastStream broker `publish` method.""" assert ( # noqa: S101 self.app.broker @@ -132,11 +132,7 @@ async def kick(self, message: BrokerMessage) -> None: async def _broker_publish( broker: Any, - message: BrokerMessage, + message: PathcedMessage, ) -> None: - labels = message.labels - labels.pop("schedule", None) - async for msg in resolve_msg( - msg=labels.pop("message", message.message), - ): - await broker.publish(msg, **labels) + async for msg in resolve_msg(message.body): + await broker.publish(msg, **message.labels) diff --git a/taskiq_faststream/formatter.py b/taskiq_faststream/formatter.py new file mode 100644 index 0000000..aa9bcc8 --- /dev/null +++ b/taskiq_faststream/formatter.py @@ -0,0 +1,45 @@ +from dataclasses import dataclass +from typing import Any, Dict + +from taskiq.abc.formatter import TaskiqFormatter +from taskiq.message import TaskiqMessage + + +@dataclass +class PathcedMessage: + """DTO to transfer data to `broker.kick`.""" + + body: Any + labels: Dict[str, Any] + + +class PatchedFormatter(TaskiqFormatter): + """Default taskiq formatter.""" + + def dumps( # type: ignore[override] + self, + message: TaskiqMessage, + ) -> PathcedMessage: + """ + Dumps taskiq message to some broker message format. + + :param message: message to send. + :return: Dumped message. + """ + labels = message.labels + labels.pop("schedule", None) + labels.pop("schedule_id", None) + + return PathcedMessage( + body=labels.pop("message", None), + labels=labels, + ) + + def loads(self, message: bytes) -> TaskiqMessage: + """ + Loads json from message. + + :param message: broker's message. + :return: parsed taskiq message. + """ + raise NotImplementedError diff --git a/taskiq_faststream/serializer.py b/taskiq_faststream/serializer.py deleted file mode 100644 index c4900b5..0000000 --- a/taskiq_faststream/serializer.py +++ /dev/null @@ -1,17 +0,0 @@ -from typing import Any - -from taskiq.serializers.json_serializer import JSONSerializer - - -class PatchedSerializer(JSONSerializer): - """Patched serializer removes labels.""" - - def dumpb(self, value: Any) -> bytes: - """ - Dumps taskiq message to some broker message format. - - :param message: message to send. - :return: Dumped message. - """ - del value["labels"] - return super().dumpb(value) diff --git a/tests/conftest.py b/tests/conftest.py index 1bf01ae..c990036 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -27,6 +27,5 @@ def mock() -> MagicMock: @pytest.fixture() -@pytest.mark.anyio async def event() -> asyncio.Event: return asyncio.Event() diff --git a/tests/testcase.py b/tests/testcase.py index 2374bfb..2aa958f 100644 --- a/tests/testcase.py +++ b/tests/testcase.py @@ -44,7 +44,7 @@ async def handler(msg: str) -> None: **{self.subj_name: subject}, schedule=[ { - "time": datetime.utcnow(), + "time": datetime.utcnow(), # old python compat }, ], )