From d55e08645f814fc09a43bebdd67faeb58ed3a126 Mon Sep 17 00:00:00 2001 From: Edbo849 Date: Mon, 14 Oct 2024 11:29:19 +0100 Subject: [PATCH] Added mongoDB persistent message dump --- docker-compose.yml | 43 ++++++++ esgf-consumer/Dockerfile | 2 +- esgf-consumer/esgf_consumer/__init__.py | 2 +- esgf-consumer/esgf_consumer/mongo_consumer.py | 88 +++++++++++++++ esgf-consumer/poetry.lock | 103 +++++++++++++++++- esgf-consumer/pyproject.toml | 1 + 6 files changed, 236 insertions(+), 3 deletions(-) create mode 100644 esgf-consumer/esgf_consumer/mongo_consumer.py diff --git a/docker-compose.yml b/docker-compose.yml index baa1b88..874ad68 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -60,6 +60,7 @@ services: CONSUMER_GROUP: "esgf-east" BOOTSTRAP_SERVERS: '["kafka1:19092"]' STAC_SERVER: "http://app-elasticsearch-east:8080" + SCRIPT: "esgf_consumer/__init__.py" depends_on: kafka1: condition: service_healthy @@ -123,6 +124,7 @@ services: CONSUMER_GROUP: "esgf-west" BOOTSTRAP_SERVERS: '["kafka1:19092"]' STAC_SERVER: "http://app-elasticsearch-west:8080" + SCRIPT: "esgf_consumer/__init__.py" depends_on: kafka1: condition: service_healthy @@ -187,6 +189,7 @@ services: BOOTSTRAP_SERVERS: '["kafka1:19092"]' KAFKA_TOPICS: '.*\.historical\..*' STAC_SERVER: "http://app-elasticsearch-secondary:8080" + SCRIPT: "esgf_consumer/__init__.py" depends_on: kafka1: condition: service_healthy @@ -372,6 +375,46 @@ services: service: conduktor-console file: conduktor.yml + mongo: + image: mongo + container_name: mongo + environment: + MONGO_INITDB_ROOT_USERNAME: root + MONGO_INITDB_ROOT_PASSWORD: example + ports: + - "27017:27017" + volumes: + - mongo_data:/data/db + + mongo-express: + image: mongo-express + container_name: mongo-express + environment: + ME_CONFIG_MONGODB_ADMINUSERNAME: root + ME_CONFIG_MONGODB_ADMINPASSWORD: example + ME_CONFIG_MONGODB_SERVER: mongo + ME_CONFIG_BASICAUTH: "false" + ports: + - "8085:8081" + + mongo-consumer: + build: + context: esgf-consumer + dockerfile: Dockerfile + depends_on: + kafka1: + condition: service_healthy + mongo: + condition: service_started + environment: + BOOTSTRAP_SERVERS: '["kafka1:19092"]' + MONGO_URI: mongodb://root:example@mongo:27017/ + MONGODB_DATABASE: esgf_playground_db + MONGODB_COLLECTION: esgf_data_collection + SCRIPT: "esgf_consumer/mongo_consumer.py" + + volumes: pg_data: {} conduktor_data: {} + mongo_data: {} \ No newline at end of file diff --git a/esgf-consumer/Dockerfile b/esgf-consumer/Dockerfile index c6b7d17..e5625f2 100644 --- a/esgf-consumer/Dockerfile +++ b/esgf-consumer/Dockerfile @@ -9,4 +9,4 @@ RUN pip install poetry RUN poetry install --only main -ENTRYPOINT ["poetry", "run", "python", "esgf_consumer/__init__.py"] \ No newline at end of file +ENTRYPOINT ["sh", "-c", "poetry run python $SCRIPT"] \ No newline at end of file diff --git a/esgf-consumer/esgf_consumer/__init__.py b/esgf-consumer/esgf_consumer/__init__.py index 1f787f4..cf0db17 100644 --- a/esgf-consumer/esgf_consumer/__init__.py +++ b/esgf-consumer/esgf_consumer/__init__.py @@ -1,5 +1,5 @@ """ -Prototype consumer for ESGF. Takes events frm KAFKA and sends them to an ESGF STAC index. +Prototype consumer for ESGF. Takes events from KAFKA and sends them to an ESGF STAC index. """ import asyncio diff --git a/esgf-consumer/esgf_consumer/mongo_consumer.py b/esgf-consumer/esgf_consumer/mongo_consumer.py new file mode 100644 index 0000000..572631d --- /dev/null +++ b/esgf-consumer/esgf_consumer/mongo_consumer.py @@ -0,0 +1,88 @@ +import asyncio +import logging +import traceback +from typing import Any + +from aiokafka.errors import KafkaError +from consumers import get_consumer +from esgf_playground_utils.config.kafka import Settings +from esgf_playground_utils.models.kafka import Error, ErrorType, KafkaEvent +from pydantic import ValidationError +from pymongo import MongoClient + +logging.getLogger().setLevel(logging.DEBUG) +logger = logging.getLogger(__name__) +logger.setLevel(logging.DEBUG) + + +async def consume_to_mongo(settings: Settings) -> None: + logger.critical("Configuration: %s", settings) + logger.critical("Waiting 10s before connection to Kafka...") + await asyncio.sleep(10) + + logger.critical("Starting consumer...") + consumer = await get_consumer(settings) + logger.critical("Consumer started.") + + logger.critical("Connecting to MongoDB...") + client: MongoClient[Any] = MongoClient("mongodb://root:example@mongo:27017/") + db = client["esgf_playground_db"] + collection = db["esgf_data_collection"] + logger.critical("Connected to MongoDB.") + + try: + async for msg in consumer: + try: + logger.critical("Received message: %s", msg) + event = KafkaEvent.model_validate_json(msg.value.decode("utf-8")) + + collection.insert_one(event.model_dump()) + logger.critical("Message persisted to MongoDB.") + + except ValidationError: + logger.exception("Payload error occurred") + error = Error( + original_payload=str(msg), + node=settings.consumer_group, + traceback=traceback.format_exc(), + error_type=ErrorType.payload, + ) + await consumer.send_and_wait( + "esgf_error", error.model_dump_json().encode() + ) + + except KafkaError: + logger.exception("Kafka exception occurred") + error = Error( + original_payload=event.model_dump_json(), + node=settings.consumer_group, + traceback=traceback.format_exc(), + error_type=ErrorType.kafka, + ) + await consumer.send_and_wait( + "esgf_error", error.model_dump_json().encode() + ) + + except Exception as e: + logger.exception("Failed to process message: %s", e) + error = Error( + original_payload=event.model_dump_json(), + node=settings.consumer_group, + traceback=traceback.format_exc(), + error_type=ErrorType.unknown, + ) + await consumer.send_and_wait( + "esgf_error", error.model_dump_json().encode() + ) + + finally: + await consumer.stop() + logger.critical("Consumer stopped.") + client.close() + logger.critical("MongoDB client closed.") + + return None + + +if __name__ == "__main__": + asyncio.run(consume_to_mongo(Settings()), debug=True) diff --git a/esgf-consumer/poetry.lock b/esgf-consumer/poetry.lock index 6f03337..9396d13 100644 --- a/esgf-consumer/poetry.lock +++ b/esgf-consumer/poetry.lock @@ -297,6 +297,26 @@ files = [ {file = "colorama-0.4.6.tar.gz", hash = "sha256:08695f5cb7ed6e0531a20572697297273c47b8cae5a63ffc6d6ed5c201be6e44"}, ] +[[package]] +name = "dnspython" +version = "2.6.1" +description = "DNS toolkit" +optional = false +python-versions = ">=3.8" +files = [ + {file = "dnspython-2.6.1-py3-none-any.whl", hash = "sha256:5ef3b9680161f6fa89daf8ad451b5f1a33b18ae8a1c6778cdf4b43f08c0a6e50"}, + {file = "dnspython-2.6.1.tar.gz", hash = "sha256:e8f0f9c23a7b7cb99ded64e6c3a6f3e701d78f50c55e002b839dea7225cff7cc"}, +] + +[package.extras] +dev = ["black (>=23.1.0)", "coverage (>=7.0)", "flake8 (>=7)", "mypy (>=1.8)", "pylint (>=3)", "pytest (>=7.4)", "pytest-cov (>=4.1.0)", "sphinx (>=7.2.0)", "twine (>=4.0.0)", "wheel (>=0.42.0)"] +dnssec = ["cryptography (>=41)"] +doh = ["h2 (>=4.1.0)", "httpcore (>=1.0.0)", "httpx (>=0.26.0)"] +doq = ["aioquic (>=0.9.25)"] +idna = ["idna (>=3.6)"] +trio = ["trio (>=0.23)"] +wmi = ["wmi (>=1.5.1)"] + [[package]] name = "esgf-playground-utils" version = "0.3.3" @@ -728,6 +748,87 @@ files = [ [package.extras] windows-terminal = ["colorama (>=0.4.6)"] +[[package]] +name = "pymongo" +version = "4.9.1" +description = "Python driver for MongoDB " +optional = false +python-versions = ">=3.8" +files = [ + {file = "pymongo-4.9.1-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:dc3d070d746ab79e9b393a5c236df20e56607389af2b79bf1bfe9a841117558e"}, + {file = "pymongo-4.9.1-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:fe709d05654c12fc513617c8d5c8d05b7e9cf1d5d94ada68add4e89530c867d2"}, + {file = "pymongo-4.9.1-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:aa4493f304b33c5d2ecee3055c98889ac6724d56f5f922d47420a45d0d4099c9"}, + {file = "pymongo-4.9.1-cp310-cp310-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:f8e8b8deba6a4bff3dd5421071083219521c74d2acae0322de5c06f1a66c56af"}, + {file = "pymongo-4.9.1-cp310-cp310-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:e3645aff8419ca60f9ccd08966b2f6b0d78053f9f98a814d025426f1d874c19a"}, + {file = "pymongo-4.9.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:51dbc6251c6783dfcc7d657c346986d8bad7210989b2fe15de16db5204a8e7ae"}, + {file = "pymongo-4.9.1-cp310-cp310-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:1d7aa9cc2d92e73bdb036c578ba019da94ea165eb147e691cd910a6fab7ce3b7"}, + {file = "pymongo-4.9.1-cp310-cp310-win32.whl", hash = "sha256:8b632e01617f2608880f7b9926f54a5f5ebb51631996e0540fff7fc7980663c9"}, + {file = "pymongo-4.9.1-cp310-cp310-win_amd64.whl", hash = "sha256:f05e34d401be871d7c87cb10727d49315444e4ded07ff876a595e4c23b7436da"}, + {file = "pymongo-4.9.1-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:6bb3d5282278594753089dc7da48bfae4a7f337a2dd4d397eabb591c649e58d0"}, + {file = "pymongo-4.9.1-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:8f0d5258bc85a4e6b5bcae8160628168e71ec4625a58ceb53327c3280a0b6914"}, + {file = "pymongo-4.9.1-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:96462fb2175f740701d229f52018ea6e4adc4148c4112e6628bb359dd534a3df"}, + {file = "pymongo-4.9.1-cp311-cp311-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:286fb275267f0293364ba579f6354452599161f1902ad411061c7f744ab88328"}, + {file = "pymongo-4.9.1-cp311-cp311-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:4cddb51cead9700c4dccc916952bc0321b8d766bf782d374bfa0e93ef47c1d20"}, + {file = "pymongo-4.9.1-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:1d79f20f9c7cbc1c708fb80b648b6fbd3220fd3437a9bd6017c1eb592e03b361"}, + {file = "pymongo-4.9.1-cp311-cp311-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:dd3352eaf578f8e9bdea7a5692910eedad1e8680f60726fc70e99c8af51a5449"}, + {file = "pymongo-4.9.1-cp311-cp311-win32.whl", hash = "sha256:ea3f0196e7c311b9944a609ac175bd91ab97952164a1246716fdd38d53ca3bcc"}, + {file = "pymongo-4.9.1-cp311-cp311-win_amd64.whl", hash = "sha256:b4c793db8457c856f333f396798470b9bfe405e17c307d581532c74cec70150c"}, + {file = "pymongo-4.9.1-cp312-cp312-macosx_10_13_x86_64.whl", hash = "sha256:47b4896544095d172c366dd4d4ea1da6b0ab1a77d8416897cc1801e2421b1e67"}, + {file = "pymongo-4.9.1-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:fbb1c7dfcf6c44e9e1928290631c7603817991cdf570691c9e15fca594918435"}, + {file = "pymongo-4.9.1-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:a7689da1d1b444284e4ea9ab2eb64a15307b6b795918c0f3cd7774dd1d8a7556"}, + {file = "pymongo-4.9.1-cp312-cp312-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:7f962d74201c772555f7a78792fed820a5ea76db5c7ee6cf43748e411b44e430"}, + {file = "pymongo-4.9.1-cp312-cp312-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:08fbab69f3fb6f8088c81f4c4a8abd84a99c132034f5e27e47f894bbcb6bf439"}, + {file = "pymongo-4.9.1-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:4327c0d9bd616b8289691360f2d4a09a72fe35479795832eae0d4ff78af53923"}, + {file = "pymongo-4.9.1-cp312-cp312-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:34e4993ae78be56f9e27a141168a1ab78253576fa3e893fa335a719ce204c3ef"}, + {file = "pymongo-4.9.1-cp312-cp312-win32.whl", hash = "sha256:e1f346811d4a2369f88ab7a6f886fa9c3bbc9ed4e4f4a3becca8717a73d465cb"}, + {file = "pymongo-4.9.1-cp312-cp312-win_amd64.whl", hash = "sha256:a2b12c74cfd90147babb77f9728646bcedfdbd2bd2a5b4130a00e3a0af1a3d34"}, + {file = "pymongo-4.9.1-cp313-cp313-macosx_10_13_x86_64.whl", hash = "sha256:a40ea8bc9cffb61c5c9c426c430d22235e085e610ee81ae075ddf51f12f76236"}, + {file = "pymongo-4.9.1-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:75d5974f874acdb2f125bdbe785045b23a39ecce1d3143dd5712800c7b6d25eb"}, + {file = "pymongo-4.9.1-cp313-cp313-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:f23a046531030318622414f21198e232cf93c5640da9a80b45596a059c8cc090"}, + {file = "pymongo-4.9.1-cp313-cp313-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:91b1a92214c3912af5467f77c2f6435cd76f6de64c70cba7bb4ee43eba7f459e"}, + {file = "pymongo-4.9.1-cp313-cp313-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:3a846423c4535428f69a90a1451df3718bc59f0c4ab685b9e96d3071951e0be4"}, + {file = "pymongo-4.9.1-cp313-cp313-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:d476d91a5c9e6c37bc8ec3fb294e1c01d95736ccf01a59bb1540fe2f710f826e"}, + {file = "pymongo-4.9.1-cp313-cp313-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:172d8ba0f567e351a18765db23dab7dbcfdffd91a8788d90d46b350f80a40781"}, + {file = "pymongo-4.9.1-cp313-cp313-win32.whl", hash = "sha256:95418e334629440f70fe5ceeefc6cbbd50defb566901c8d68179ffbaec8d5f01"}, + {file = "pymongo-4.9.1-cp313-cp313-win_amd64.whl", hash = "sha256:1dfd2aa30174d36a3ef1dae4ee4c89710c2d65cac52ce6e13f17c710edbd61cf"}, + {file = "pymongo-4.9.1-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:c4204fad54830a3173a5c939cd052d0561fba03dba7e0ff6852fd631f3314aa4"}, + {file = "pymongo-4.9.1-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:375765ec81b1f0a26d08928afea0c3dff897c36080a090be53fc7b70cc51d497"}, + {file = "pymongo-4.9.1-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:4d1b959a3dda0775d9111622ee47ad47772aed3a9da2e7d5f2f513fa68175dea"}, + {file = "pymongo-4.9.1-cp38-cp38-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:42c19d2b094cdd0ead7dbb38860bbe8268c140334ce55d8b39204ddb4ebd4904"}, + {file = "pymongo-4.9.1-cp38-cp38-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:1fac1def9e9073f1c80198c99f0ec39c2528236c8912d96d7fd3b0237f4c523a"}, + {file = "pymongo-4.9.1-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:b347052d510989d1f52b8553b31297f21cf74bd9f6aed71ee84e563492f4ff17"}, + {file = "pymongo-4.9.1-cp38-cp38-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:1b4b961fce213f2bcdc92268f85111a3668c61b9b4d4e7ece27dce3a137cfcbd"}, + {file = "pymongo-4.9.1-cp38-cp38-manylinux_2_5_i686.manylinux1_i686.whl", hash = "sha256:a0b10cf51ec14a487c94709d294c00e1fb6a0a4c38cdc3acfb2ced5ef60972a0"}, + {file = "pymongo-4.9.1-cp38-cp38-manylinux_2_5_x86_64.manylinux1_x86_64.whl", hash = "sha256:679b8d55854da7c7fdb82aa5e092ab4de0144daf6758defed8ab00ff9ce05360"}, + {file = "pymongo-4.9.1-cp38-cp38-win32.whl", hash = "sha256:432ad395d2233056b042ccc73234e7136aa65d944d6bd8b5138394bd38aaff79"}, + {file = "pymongo-4.9.1-cp38-cp38-win_amd64.whl", hash = "sha256:9fbe9fad27619ac4cfda5df0ade26a99906da7dfe7b01deddc25997eb1804e4c"}, + {file = "pymongo-4.9.1-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:99b611ff75b5d9e17183dcf9584a7b04f9db07e51a162f23ea05e485e0735c0a"}, + {file = "pymongo-4.9.1-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:8089003a99127f917bdbeec177d41cef019cda8ec70534c1018cb60aacd23c2a"}, + {file = "pymongo-4.9.1-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:9d78adf25967c06298c7e488f4cfab79a390fc32c2b1d428613976f99031603d"}, + {file = "pymongo-4.9.1-cp39-cp39-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:56877cfcdf7dfc5c6408e4551ec0d6d65ebbca4d744a0bc90400f09ef6bbcc8a"}, + {file = "pymongo-4.9.1-cp39-cp39-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:16d2efe559d0d96bc0b74b3ff76701ad6f6e1a65f6581b573dcacc29158131c8"}, + {file = "pymongo-4.9.1-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:f838f613e74b4dad8ace0d90f42346005bece4eda5bf6d389cfadb8322d39316"}, + {file = "pymongo-4.9.1-cp39-cp39-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:db5b299e11284f8d82ce2983d8e19fcc28f98f902a179709ef1982b4cca6f8b8"}, + {file = "pymongo-4.9.1-cp39-cp39-manylinux_2_5_i686.manylinux1_i686.whl", hash = "sha256:b23211c031b45d0f32de83ab7d77f9c26f1025c2d2c91463a5d8594a16103655"}, + {file = "pymongo-4.9.1-cp39-cp39-manylinux_2_5_x86_64.manylinux1_x86_64.whl", hash = "sha256:687cf70e096381bc65b4273a6a9319617618f7ace65caffc356e1099c4a68511"}, + {file = "pymongo-4.9.1-cp39-cp39-win32.whl", hash = "sha256:e02b03e3815b80a63e773e4c32aed3cf5633d406f376477be74550295c211256"}, + {file = "pymongo-4.9.1-cp39-cp39-win_amd64.whl", hash = "sha256:0492ef43f3342354cf581712e431621c221f60c877ebded84e3f3e53b71bbbe0"}, + {file = "pymongo-4.9.1.tar.gz", hash = "sha256:b7f2d34390acf60e229c30037d1473fcf69f4536cd7f48f6f78c0c931c61c505"}, +] + +[package.dependencies] +dnspython = ">=1.16.0,<3.0.0" + +[package.extras] +aws = ["pymongo-auth-aws (>=1.1.0,<2.0.0)"] +docs = ["furo (==2023.9.10)", "readthedocs-sphinx-search (>=0.3,<1.0)", "sphinx (>=5.3,<8)", "sphinx-autobuild (>=2020.9.1)", "sphinx-rtd-theme (>=2,<3)", "sphinxcontrib-shellcheck (>=1,<2)"] +encryption = ["certifi", "pymongo-auth-aws (>=1.1.0,<2.0.0)", "pymongocrypt (>=1.10.0,<2.0.0)"] +gssapi = ["pykerberos", "winkerberos (>=0.5.0)"] +ocsp = ["certifi", "cryptography (>=2.5)", "pyopenssl (>=17.2.0)", "requests (<3.0.0)", "service-identity (>=18.1.0)"] +snappy = ["python-snappy"] +test = ["pytest (>=8.2)", "pytest-asyncio (>=0.24.0)"] +zstd = ["zstandard"] + [[package]] name = "python-dotenv" version = "1.0.1" @@ -992,4 +1093,4 @@ requests = ">=2.0,<3.0" [metadata] lock-version = "2.0" python-versions = "^3.12" -content-hash = "96318833abb02edd1b3ca4a3858a63b44548bc9318c7b5661a2e29b65669cba9" +content-hash = "367fda3bcf880006efe9a255183a604e05c243bbd2dbd84c513923cb89eee995" diff --git a/esgf-consumer/pyproject.toml b/esgf-consumer/pyproject.toml index 93c667a..abc6322 100644 --- a/esgf-consumer/pyproject.toml +++ b/esgf-consumer/pyproject.toml @@ -11,6 +11,7 @@ httpx = "^0.27.0" click = "^8.1.7" aiokafka = "^0.11.0" esgf-playground-utils = "^0.3.0" +pymongo = "^4.9.1" [tool.poetry.group.black.dependencies] black = "^24.4.2"