Skip to content

Commit

Permalink
Added Soft Delete and esgf_generator tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Edbo849 committed Sep 19, 2024
1 parent cbd91e1 commit 33d46eb
Show file tree
Hide file tree
Showing 6 changed files with 180 additions and 28 deletions.
33 changes: 24 additions & 9 deletions esgf-consumer/esgf_consumer/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,26 +8,31 @@

import httpx
from aiokafka.errors import KafkaError
from esgf_consumer.collection import ensure_collection
from esgf_consumer.consumers import get_consumer
from esgf_consumer.exceptions import (
ESGFConsumerNotImplementedPayloadError,
ESGFConsumerUnknownPayloadError,
)
from esgf_consumer.items import (
create_item,
hard_delete_item,
soft_delete_item,
update_item,
)
from esgf_consumer.producers import get_producer
from esgf_playground_utils.config.kafka import Settings
from esgf_playground_utils.models.kafka import (
CreatePayload,
Error,
ErrorType,
KafkaEvent,
PartialUpdatePayload,
RevokePayload,
UpdatePayload,
)
from pydantic import ValidationError

from esgf_consumer.collection import ensure_collection
from esgf_consumer.consumers import get_consumer
from esgf_consumer.exceptions import (
ESGFConsumerNotImplementedPayloadError,
ESGFConsumerUnknownPayloadError,
)
from esgf_consumer.items import create_item, hard_delete_item, update_item
from esgf_consumer.producers import get_producer

logging.getLogger().setLevel(logging.DEBUG)
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
Expand Down Expand Up @@ -171,6 +176,16 @@ async def _handle_message(
)
logger.critical("Item %s deleted.", event.data.payload.item_id)

case PartialUpdatePayload(method="PATCH"):
await soft_delete_item(
event.data.payload.collection_id,
event.data.payload.item,
event.data.payload.item_id,
settings,
client,
)
logger.critical("Item %s soft deleted.", event.data.payload.item_id)

case _:
raise ESGFConsumerUnknownPayloadError

Expand Down
23 changes: 23 additions & 0 deletions esgf-consumer/esgf_consumer/items.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import json
import logging
from typing import Any, Dict
from urllib.parse import urljoin

import httpx
Expand Down Expand Up @@ -61,3 +63,24 @@ async def hard_delete_item(
logger.critical("Item not deleted: %s", result.content)

return None


async def soft_delete_item(
collection_id: str,
item: dict,
item_id: str,
settings: Settings,
client: httpx.AsyncClient,
) -> None:
path = f"collections/{collection_id}/items/{item_id}"
url = urljoin(str(settings.stac_server), path)

logger.critical("Soft deleting %s at %s", item_id, url)
result = await client.patch(url, content=json.dumps(item), timeout=5)
if result.status_code < 300:
logger.critical("Item Soft Deleted")

else:
logger.critical("Item not deleted: %s", result.content)

return None
11 changes: 5 additions & 6 deletions esgf-generator/esgf_generator/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,17 @@
from urllib.parse import urljoin

import httpx
from esgf_playground_utils.models.item import ESGFItem, ESGFItemProperties
from polyfactory import PostGenerated
from polyfactory.factories.pydantic_factory import ModelFactory
from polyfactory.fields import Use
from typing_extensions import ParamSpec

from esgf_generator.data import CHOICES
from esgf_generator.static_generators import (
generate_datetime,
generate_geometry,
instance_id,
)
from esgf_playground_utils.models.item import ESGFItem, ESGFItemProperties
from polyfactory import PostGenerated
from polyfactory.factories.pydantic_factory import ModelFactory
from polyfactory.fields import Use
from typing_extensions import ParamSpec

API_URL = "http://ceda.stac.ac.uk"
START_DATETIME = datetime.fromisoformat("1900-01-01T00:00:00").replace(
Expand Down
91 changes: 87 additions & 4 deletions esgf-generator/esgf_generator/cli.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import json
import random
import time
from typing import Literal

import click
import httpx
from esgf_playground_utils.models.item import ESGFItem

from esgf_generator import ESGFItemFactory
from esgf_playground_utils.models.item import ESGFItem

NODE_PORTS = {"east": 9050, "west": 9051}

Expand Down Expand Up @@ -128,7 +128,6 @@ def esgf_update(
if result.status_code >= 300:
raise Exception(result.content)

click.echo()
click.echo("Done")


Expand Down Expand Up @@ -168,8 +167,92 @@ def esgf_delete(
f"http://localhost:{NODE_PORTS[node]}/{collection_id}/items/{item_id}"
)
else:
raise NotImplementedError("Soft delete is not implemented yet.")
click.echo("Soft deleting item")
click.echo()

content = {"Properties": {"retracted": True}}
result = client.patch(
f"http://localhost:{NODE_PORTS[node]}/{collection_id}/items/{item_id}",
content=json.dumps(content),
)
if result.status_code >= 300:
raise Exception(result.content)

click.echo("Done")


@click.command()
def esgf_generator_test() -> None:
"""
Generate a number of ESGF items.
COUNT is the number of items to generate.
"""
click.echo(f"Producing a STAC record to test")
click.echo()

data = ESGFItemFactory().batch(
1,
stac_extensions=[],
)
instance = data[0]
publish = True

if publish:
click.echo(f"Sending {instance.properties.instance_id} to ESGF node 'east'")
click.echo()

with httpx.Client() as client:

# Create Item
result = client.post(
f"http://localhost:9050/{instance.collection}/items",
content=instance.model_dump_json(),
)
if result.status_code >= 300:
click.echo("Test [1/3]: Failed")
click.echo(
f"Failed to create item, Error: \n{result.content}, Status: {result.status_code}\n"
)

else:
click.echo("Test [1/3]: Passed")
click.echo(
f"Created item {instance.properties.instance_id} in collection {instance.collection}, Status: {result.status_code}\n"
)

# Replication
patch_data = {"node": "Fake Node"}
patch_result = client.patch(
f"http://localhost:9050/{instance.collection}/items/{instance.properties.instance_id}",
json=patch_data,
)
if patch_result.status_code >= 300:
click.echo("Test [2/3]: Failed")
click.echo(
f"Failed to add node to item, Error: \n{patch_result.content}, Status: {patch_result.status_code}\n"
)
else:
click.echo("Test [2/3]: Passed")
click.echo(
f"Added node to item {instance.properties.instance_id} in collection {instance.collection}, Status: {result.status_code}\n"
)
# Retraction
remove_patch = {"Properties": {"retracted": True}}
remove_result = client.patch(
f"http://localhost:9050/{instance.collection}/items/{instance.properties.instance_id}",
json=remove_patch,
)
if remove_result.status_code >= 300:
click.echo("Test [3/3]: Failed")
click.echo(
f"Failed to remove node from item, Error: \n{remove_result.content}, Status: {remove_result.status_code}\n"
)

else:
click.echo("Test [3/3]: Passed")
click.echo(
f"Reomved node from item {instance.properties.instance_id} in collection {instance.collection}, Status: {result.status_code}\n"
)

click.echo("Done")
1 change: 1 addition & 0 deletions esgf-generator/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ xenon = "^0.9.1"
esgf_generator = "esgf_generator.cli:esgf_generator"
esgf_update = "esgf_generator.cli:esgf_update"
esgf_delete = "esgf_generator.cli:esgf_delete"
esgf_generator_test = "esgf_generator.cli:esgf_generator_test"

[tool.mypy]
plugins = [
Expand Down
49 changes: 40 additions & 9 deletions esgf-transaction-api/esgf_transaction_api/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import sys
from contextlib import asynccontextmanager
from datetime import datetime
from typing import Any, AsyncGenerator, Optional, Union
from typing import Any, AsyncGenerator, Dict, Optional, Union

import aiokafka
from esgf_playground_utils.config.kafka import Settings
Expand All @@ -12,11 +12,12 @@
Data,
KafkaEvent,
Metadata,
PartialUpdatePayload,
Publisher,
RevokePayload,
UpdatePayload,
)
from fastapi import FastAPI, HTTPException, Request
from fastapi import FastAPI, HTTPException
from stac_pydantic.item import Item
from stac_pydantic.item_collection import ItemCollection

Expand Down Expand Up @@ -48,7 +49,7 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[Any, Any]:


def item_body(
payload: Union[RevokePayload, UpdatePayload, CreatePayload]
payload: Union[RevokePayload, UpdatePayload, CreatePayload, PartialUpdatePayload]
) -> KafkaEvent:
data = Data(type="STAC", version="1.0.0", payload=payload)
auth = Auth(client_id="esgf-generator", server="docker-compose-local")
Expand Down Expand Up @@ -126,6 +127,14 @@ async def revoke_item_hard(collection_id: str, item_id: str) -> None:
await delete_message(event)


async def revoke_item_soft(collection_id: str, item_id: str, item: dict) -> None:
payload = PartialUpdatePayload(
method="PATCH", collection_id=collection_id, item=item, item_id=item_id
)
event = item_body(payload)
await delete_message(event)


@app.post("/{collection_id}/items", status_code=202)
async def create_item(
collection_id: str, item: Union[Item, ItemCollection]
Expand Down Expand Up @@ -169,13 +178,16 @@ async def update_item(collection_id: str, item_id: str, item: Item) -> Item:
"""
logger.info("Updating %s item", collection_id)

await modify_item(collection_id, item, item_id)
try:
await modify_item(collection_id, item, item_id)
except Exception as e:
(f"Collection {collection_id} not found: {str(e)}")

return item


@app.delete("/{collection_id}/items/{item_id}")
async def delete_item(item_id: str, collection_id: str, request: Request) -> None:
async def delete_item_hard(item_id: str, collection_id: str) -> None:
"""Add DELETE message to kafka event stream.
Args:
Expand All @@ -186,7 +198,26 @@ async def delete_item(item_id: str, collection_id: str, request: Request) -> Non
Optional[stac_types.Item]: The deleted item, or `None` if the item was successfully deleted.
"""
logger.info("Deleting %s item", collection_id)
if request.method == "DELETE":
await revoke_item_hard(collection_id, item_id)
else:
raise NotImplementedError("Soft delete is not implemented yet.")

await revoke_item_hard(collection_id, item_id)

return None


@app.patch("/{collection_id}/items/{item_id}")
async def delete_item_soft(item_id: str, collection_id: str, item: dict) -> None:
"""Add DELETE message to kafka event stream.
Args:
item_id (str): The identifier of the item to delete.
collection_id (str): The identifier of the collection that contains the item.
item (dict): The item data beiing patched.
Returns:
None
"""
logger.info("Deleting %s item", collection_id)

await revoke_item_soft(collection_id, item_id, item)

return None

0 comments on commit 33d46eb

Please sign in to comment.