Skip to content

Commit

Permalink
Added mongoDB persistent message dump
Browse files Browse the repository at this point in the history
  • Loading branch information
Edbo849 committed Oct 14, 2024
1 parent 495b5c9 commit d55e086
Show file tree
Hide file tree
Showing 6 changed files with 236 additions and 3 deletions.
43 changes: 43 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ services:
CONSUMER_GROUP: "esgf-east"
BOOTSTRAP_SERVERS: '["kafka1:19092"]'
STAC_SERVER: "http://app-elasticsearch-east:8080"
SCRIPT: "esgf_consumer/__init__.py"
depends_on:
kafka1:
condition: service_healthy
Expand Down Expand Up @@ -123,6 +124,7 @@ services:
CONSUMER_GROUP: "esgf-west"
BOOTSTRAP_SERVERS: '["kafka1:19092"]'
STAC_SERVER: "http://app-elasticsearch-west:8080"
SCRIPT: "esgf_consumer/__init__.py"
depends_on:
kafka1:
condition: service_healthy
Expand Down Expand Up @@ -187,6 +189,7 @@ services:
BOOTSTRAP_SERVERS: '["kafka1:19092"]'
KAFKA_TOPICS: '.*\.historical\..*'
STAC_SERVER: "http://app-elasticsearch-secondary:8080"
SCRIPT: "esgf_consumer/__init__.py"
depends_on:
kafka1:
condition: service_healthy
Expand Down Expand Up @@ -372,6 +375,46 @@ services:
service: conduktor-console
file: conduktor.yml

mongo:
image: mongo
container_name: mongo
environment:
MONGO_INITDB_ROOT_USERNAME: root
MONGO_INITDB_ROOT_PASSWORD: example
ports:
- "27017:27017"
volumes:
- mongo_data:/data/db

mongo-express:
image: mongo-express
container_name: mongo-express
environment:
ME_CONFIG_MONGODB_ADMINUSERNAME: root
ME_CONFIG_MONGODB_ADMINPASSWORD: example
ME_CONFIG_MONGODB_SERVER: mongo
ME_CONFIG_BASICAUTH: "false"
ports:
- "8085:8081"

mongo-consumer:
build:
context: esgf-consumer
dockerfile: Dockerfile
depends_on:
kafka1:
condition: service_healthy
mongo:
condition: service_started
environment:
BOOTSTRAP_SERVERS: '["kafka1:19092"]'
MONGO_URI: mongodb://root:example@mongo:27017/
MONGODB_DATABASE: esgf_playground_db
MONGODB_COLLECTION: esgf_data_collection
SCRIPT: "esgf_consumer/mongo_consumer.py"


volumes:
pg_data: {}
conduktor_data: {}
mongo_data: {}
2 changes: 1 addition & 1 deletion esgf-consumer/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,4 @@ RUN pip install poetry

RUN poetry install --only main

ENTRYPOINT ["poetry", "run", "python", "esgf_consumer/__init__.py"]
ENTRYPOINT ["sh", "-c", "poetry run python $SCRIPT"]
2 changes: 1 addition & 1 deletion esgf-consumer/esgf_consumer/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""
Prototype consumer for ESGF. Takes events frm KAFKA and sends them to an ESGF STAC index.
Prototype consumer for ESGF. Takes events from KAFKA and sends them to an ESGF STAC index.
"""

import asyncio
Expand Down
88 changes: 88 additions & 0 deletions esgf-consumer/esgf_consumer/mongo_consumer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
import asyncio
import logging
import traceback
from typing import Any

from aiokafka.errors import KafkaError
from consumers import get_consumer
from esgf_playground_utils.config.kafka import Settings
from esgf_playground_utils.models.kafka import Error, ErrorType, KafkaEvent
from pydantic import ValidationError
from pymongo import MongoClient

logging.getLogger().setLevel(logging.DEBUG)
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)


async def consume_to_mongo(settings: Settings) -> None:
logger.critical("Configuration: %s", settings)
logger.critical("Waiting 10s before connection to Kafka...")
await asyncio.sleep(10)

logger.critical("Starting consumer...")
consumer = await get_consumer(settings)
logger.critical("Consumer started.")

logger.critical("Connecting to MongoDB...")
client: MongoClient[Any] = MongoClient("mongodb://root:example@mongo:27017/")
db = client["esgf_playground_db"]
collection = db["esgf_data_collection"]
logger.critical("Connected to MongoDB.")

try:
async for msg in consumer:
try:
logger.critical("Received message: %s", msg)
event = KafkaEvent.model_validate_json(msg.value.decode("utf-8"))

collection.insert_one(event.model_dump())
logger.critical("Message persisted to MongoDB.")

except ValidationError:
logger.exception("Payload error occurred")
error = Error(
original_payload=str(msg),
node=settings.consumer_group,
traceback=traceback.format_exc(),
error_type=ErrorType.payload,
)
await consumer.send_and_wait(
"esgf_error", error.model_dump_json().encode()
)

except KafkaError:
logger.exception("Kafka exception occurred")
error = Error(
original_payload=event.model_dump_json(),
node=settings.consumer_group,
traceback=traceback.format_exc(),
error_type=ErrorType.kafka,
)
await consumer.send_and_wait(
"esgf_error", error.model_dump_json().encode()
)

except Exception as e:
logger.exception("Failed to process message: %s", e)
error = Error(
original_payload=event.model_dump_json(),
node=settings.consumer_group,
traceback=traceback.format_exc(),
error_type=ErrorType.unknown,
)
await consumer.send_and_wait(
"esgf_error", error.model_dump_json().encode()
)

finally:
await consumer.stop()
logger.critical("Consumer stopped.")
client.close()
logger.critical("MongoDB client closed.")

return None


if __name__ == "__main__":
asyncio.run(consume_to_mongo(Settings()), debug=True)
103 changes: 102 additions & 1 deletion esgf-consumer/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions esgf-consumer/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ httpx = "^0.27.0"
click = "^8.1.7"
aiokafka = "^0.11.0"
esgf-playground-utils = "^0.3.0"
pymongo = "^4.9.1"

[tool.poetry.group.black.dependencies]
black = "^24.4.2"
Expand Down

0 comments on commit d55e086

Please sign in to comment.