Skip to content

Commit

Permalink
Refactor for complexity
Browse files Browse the repository at this point in the history
  • Loading branch information
djspstfc committed Aug 16, 2024
1 parent 02fb251 commit 06d0fd5
Showing 1 changed file with 32 additions and 29 deletions.
61 changes: 32 additions & 29 deletions esgf-consumer/esgf_consumer/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,35 +57,7 @@ async def consume(settings: Settings) -> None:
logger.critical("Received message: %s", msg)
event = KafkaEvent.model_validate_json(msg.value.decode("utf-8"))

await ensure_collection(
settings.stac_server, event.data.payload.collection_id, client
)
logger.critical(
"Collection %s confirmed on %s",
event.data.payload.collection_id,
settings.stac_server,
)

match event.data.payload:
case CreatePayload():
await create_item(
event.data.payload.collection_id,
event.data.payload.item,
settings,
client,
)
logger.critical(
"Item %s created.", event.data.payload.item.id
)

case UpdatePayload():
raise ESGFConsumerNotImplementedPayloadError

case RevokePayload():
raise ESGFConsumerNotImplementedPayloadError

case _:
raise ESGFConsumerUnknownPayloadError
await _handle_message(client, event, settings)

except ESGFConsumerUnknownPayloadError:
logger.exception("Received a valid but unknown payload")
Expand Down Expand Up @@ -159,5 +131,36 @@ async def consume(settings: Settings) -> None:
return None


async def _handle_message(
client: httpx.AsyncClient, event: KafkaEvent, settings: Settings
) -> None:
await ensure_collection(
settings.stac_server, event.data.payload.collection_id, client
)
logger.critical(
"Collection %s confirmed on %s",
event.data.payload.collection_id,
settings.stac_server,
)
match event.data.payload:
case CreatePayload():
await create_item(
event.data.payload.collection_id,
event.data.payload.item,
settings,
client,
)
logger.critical("Item %s created.", event.data.payload.item.id)

case UpdatePayload():
raise ESGFConsumerNotImplementedPayloadError

case RevokePayload():
raise ESGFConsumerNotImplementedPayloadError

case _:
raise ESGFConsumerUnknownPayloadError


if __name__ == "__main__":
asyncio.run(consume(Settings()), debug=True)

0 comments on commit 06d0fd5

Please sign in to comment.