diff --git a/.github/workflows/esgf-api-ingest-check.yml b/.github/workflows/esgf-api-ingest-check.yml index e065f0d..bd2cc0e 100644 --- a/.github/workflows/esgf-api-ingest-check.yml +++ b/.github/workflows/esgf-api-ingest-check.yml @@ -51,10 +51,10 @@ jobs: cd esgf-api-ingest pip install poetry poetry self add poetry-audit-plugin - poetry audit + poetry audit --ignore-code=CVE-2019-8341 - name: Run xenon run: | cd esgf-api-ingest pip install poetry poetry install --only xenon - poetry audit \ No newline at end of file + poetry run xenon . -a A -b A -m A \ No newline at end of file diff --git a/.github/workflows/esgf-consumer-check.yml b/.github/workflows/esgf-consumer-check.yml index cef162d..bddfc7c 100644 --- a/.github/workflows/esgf-consumer-check.yml +++ b/.github/workflows/esgf-consumer-check.yml @@ -57,4 +57,4 @@ jobs: cd esgf-consumer pip install poetry poetry install --only xenon - poetry audit \ No newline at end of file + poetry run xenon . -a A -b A -m A \ No newline at end of file diff --git a/.github/workflows/esgf-generator-check.yml b/.github/workflows/esgf-generator-check.yml index a33d850..d28bffc 100644 --- a/.github/workflows/esgf-generator-check.yml +++ b/.github/workflows/esgf-generator-check.yml @@ -57,4 +57,4 @@ jobs: cd esgf-generator pip install poetry poetry install --only xenon - poetry audit \ No newline at end of file + poetry run xenon . -a A -b A -m A \ No newline at end of file diff --git a/.github/workflows/esgf-playground-utils-check.yml b/.github/workflows/esgf-playground-utils-check.yml index cfc0683..c4a47ce 100644 --- a/.github/workflows/esgf-playground-utils-check.yml +++ b/.github/workflows/esgf-playground-utils-check.yml @@ -57,4 +57,4 @@ jobs: cd esgf-playground-utils pip install poetry poetry install --only xenon - poetry audit \ No newline at end of file + poetry run xenon . -a A -b A -m A \ No newline at end of file diff --git a/esgf-api-ingest/esgf_api_ingest/main.py b/esgf-api-ingest/esgf_api_ingest/main.py index 99daf84..5db3b94 100644 --- a/esgf-api-ingest/esgf_api_ingest/main.py +++ b/esgf-api-ingest/esgf_api_ingest/main.py @@ -1,21 +1,13 @@ -import asyncio -import json import logging -import os from contextlib import asynccontextmanager from datetime import datetime -from typing import Any, AsyncGenerator, NoReturn, Optional, Union +from typing import Any, AsyncGenerator, Optional, Union import aiokafka from esgf_playground_utils.config.kafka import Settings -from esgf_playground_utlis.models.kafka import ( - Auth, - CreatePayload, - Data, - KafkaEvent, - Metadata, - Publisher, -) +from esgf_playground_utlis.models.kafka import (Auth, CreatePayload, Data, + KafkaEvent, Metadata, + Publisher) from fastapi import FastAPI, HTTPException from stac_pydantic.item import Item from stac_pydantic.item_collection import ItemCollection @@ -53,6 +45,9 @@ async def post_message(event: KafkaEvent) -> None: value = event.model_dump_json().encode("utf8") topic = get_topic(event.data.payload.item) + if producer is None: + raise Exception("Kafka producer is not initialized") + await producer.send_and_wait(topic, value) except Exception as exc: raise HTTPException(status_code=500, detail=repr(exc)) from exc @@ -67,7 +62,7 @@ async def post_item(collection_id: str, item: Item) -> None: auth=auth, publisher=publisher, time=datetime.now(), schema_version="1.0.0" ) event = KafkaEvent(metadata=metadata, data=data) - post_message(event) + await post_message(event) @app.post("/{collection_id}/items", status_code=202) diff --git a/esgf-consumer/esgf_consumer/__init__.py b/esgf-consumer/esgf_consumer/__init__.py index 6954d3f..e6f3c47 100644 --- a/esgf-consumer/esgf_consumer/__init__.py +++ b/esgf-consumer/esgf_consumer/__init__.py @@ -8,13 +8,14 @@ import httpx from aiokafka.errors import KafkaError +from esgf_playground_utils.models.kafka import Error, ErrorType, KafkaEvent +from pydantic import ValidationError + from esgf_consumer.collection import ensure_collection from esgf_consumer.config import Settings from esgf_consumer.consumers import get_consumer from esgf_consumer.items import create_item from esgf_consumer.producers import get_producer -from esgf_playground_utils.models.kafka import Error, ErrorType, KafkaEvent -from pydantic import ValidationError logging.getLogger().setLevel(logging.DEBUG) logger = logging.getLogger(__name__) diff --git a/esgf-consumer/esgf_consumer/cli.py b/esgf-consumer/esgf_consumer/cli.py index acb4f25..7e74a7d 100644 --- a/esgf-consumer/esgf_consumer/cli.py +++ b/esgf-consumer/esgf_consumer/cli.py @@ -2,9 +2,10 @@ import logging import click -from esgf_consumer import consume from esgf_playgroud_utils.config.kafka import Settings +from esgf_consumer import consume + logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) diff --git a/esgf-consumer/esgf_consumer/items.py b/esgf-consumer/esgf_consumer/items.py index b709f3b..e503678 100644 --- a/esgf-consumer/esgf_consumer/items.py +++ b/esgf-consumer/esgf_consumer/items.py @@ -2,9 +2,10 @@ from urllib.parse import urljoin import httpx -from esgf_consumer.config import Settings from stac_pydantic.item import Item +from esgf_consumer.config import Settings + logger = logging.getLogger(__name__)