Skip to content

Commit

Permalink
Added extra command line option for partial update
Browse files Browse the repository at this point in the history
  • Loading branch information
Edbo849 committed Sep 23, 2024
1 parent ad91196 commit 8c5d068
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 35 deletions.
32 changes: 22 additions & 10 deletions esgf-consumer/esgf_consumer/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,30 @@
from aiokafka.errors import KafkaError
from esgf_consumer.collection import ensure_collection
from esgf_consumer.consumers import get_consumer
from esgf_consumer.exceptions import (ESGFConsumerNotImplementedPayloadError,
ESGFConsumerUnknownPayloadError)
from esgf_consumer.items import (create_item, hard_delete_item,
soft_delete_item, update_item)
from esgf_consumer.exceptions import (
ESGFConsumerNotImplementedPayloadError,
ESGFConsumerUnknownPayloadError,
)
from esgf_consumer.items import (
create_item,
hard_delete_item,
partial_update_item,
update_item,
)
from esgf_consumer.producers import get_producer
from esgf_playground_utils.config.kafka import Settings
from esgf_playground_utils.models.kafka import (CreatePayload, Error,
ErrorType, KafkaEvent,
PartialUpdatePayload,
RevokePayload, UpdatePayload)
from esgf_playground_utils.models.kafka import (
CreatePayload,
Error,
ErrorType,
KafkaEvent,
PartialUpdatePayload,
RevokePayload,
UpdatePayload,
)
from pydantic import ValidationError


logging.getLogger().setLevel(logging.DEBUG)
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
Expand Down Expand Up @@ -166,14 +178,14 @@ async def _handle_message(
logger.critical("Item %s deleted.", event.data.payload.item_id)

case PartialUpdatePayload(method="PATCH"):
await soft_delete_item(
await partial_update_item(
event.data.payload.collection_id,
event.data.payload.item,
event.data.payload.item_id,
settings,
client,
)
logger.critical("Item %s soft deleted.", event.data.payload.item_id)
logger.critical("Item %s partially Updated.", event.data.payload.item_id)

case _:
raise ESGFConsumerUnknownPayloadError
Expand Down
8 changes: 4 additions & 4 deletions esgf-consumer/esgf_consumer/items.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ async def hard_delete_item(
return None


async def soft_delete_item(
async def partial_update_item(
collection_id: str,
item: dict,
item_id: str,
Expand All @@ -75,12 +75,12 @@ async def soft_delete_item(
path = f"collections/{collection_id}/items/{item_id}"
url = urljoin(str(settings.stac_server), path)

logger.critical("Soft deleting %s at %s", item_id, url)
logger.critical("Partially updating %s at %s", item_id, url)
result = await client.patch(url, content=json.dumps(item), timeout=5)
if result.status_code < 300:
logger.critical("Item Soft Deleted")
logger.critical("Item Partially Updated")

else:
logger.critical("Item not deleted: %s", result.content)
logger.critical("Item not updated: %s", result.content)

return None
38 changes: 31 additions & 7 deletions esgf-generator/esgf_generator/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,18 @@ def esgf_generator(
default=False,
help="Whether to publish items to ESGF, or just print to the console (print happens anyway). Default: --no-publish",
)
@click.option(
"--partial",
type=str,
default="{}",
help="JSON string representing the partial update data. Default: empty dictionary",
)
def esgf_update(
collection_id: str, item_id: str, publish: bool, node: Literal["east", "west"]
collection_id: str,
item_id: str,
publish: bool,
node: Literal["east", "west"],
partial: str,
) -> None:
"""
Update an ESGF item.
Expand All @@ -116,16 +126,30 @@ def esgf_update(

item = data[0]

partial_update_data: Dict[str, Any] = json.loads(partial)

item = update_topic(item, item_id, collection_id)

if publish:
click.echo(f"Updating item {item_id} in collection {collection_id}")
click.echo()
with httpx.Client() as client:
result = client.put(
f"http://localhost:{NODE_PORTS[node]}/{collection_id}/items/{item_id}",
content=item.model_dump_json(),
)
if partial_update_data:
click.echo(
f"Partially updating item {item_id} in collection {collection_id}"
)
click.echo()

result = client.patch(
f"http://localhost:{NODE_PORTS[node]}/{collection_id}/items/{item_id}",
content=json.dumps(partial_update_data),
)

else:
click.echo(f"Updating item {item_id} in collection {collection_id}")
click.echo()
result = client.put(
f"http://localhost:{NODE_PORTS[node]}/{collection_id}/items/{item_id}",
content=item.model_dump_json(),
)
if result.status_code >= 300:
raise Exception(result.content)

Expand Down
34 changes: 20 additions & 14 deletions esgf-transaction-api/esgf_transaction_api/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,17 @@

import aiokafka
from esgf_playground_utils.config.kafka import Settings
from esgf_playground_utils.models.kafka import (Auth, CreatePayload, Data,
KafkaEvent, Metadata,
PartialUpdatePayload,
Publisher, RevokePayload,
UpdatePayload)
from esgf_playground_utils.models.kafka import (
Auth,
CreatePayload,
Data,
KafkaEvent,
Metadata,
PartialUpdatePayload,
Publisher,
RevokePayload,
UpdatePayload,
)
from fastapi import FastAPI, HTTPException
from stac_pydantic.item import Item
from stac_pydantic.item_collection import ItemCollection
Expand Down Expand Up @@ -86,7 +92,7 @@ async def post_message(event: KafkaEvent) -> None:
raise HTTPException(status_code=500, detail=repr(exc)) from exc


async def delete_message(event: KafkaEvent) -> None:
async def alternate_message(event: KafkaEvent) -> None:
try:
value = event.model_dump_json().encode("utf8")
topic = get_topic_alternate(event.data.payload.item_id)
Expand Down Expand Up @@ -118,15 +124,15 @@ async def revoke_item_hard(collection_id: str, item_id: str) -> None:
method="DELETE", collection_id=collection_id, item_id=item_id
)
event = item_body(payload)
await delete_message(event)
await alternate_message(event)


async def revoke_item_soft(collection_id: str, item_id: str, item: dict) -> None:
async def partial_update_item(collection_id: str, item_id: str, item: dict) -> None:
payload = PartialUpdatePayload(
method="PATCH", collection_id=collection_id, item=item, item_id=item_id
)
event = item_body(payload)
await delete_message(event)
await alternate_message(event)


@app.post("/{collection_id}/items", status_code=202)
Expand Down Expand Up @@ -199,19 +205,19 @@ async def delete_item_hard(item_id: str, collection_id: str) -> None:


@app.patch("/{collection_id}/items/{item_id}")
async def delete_item_soft(item_id: str, collection_id: str, item: dict) -> None:
"""Add DELETE message to kafka event stream.
async def partial_update(item_id: str, collection_id: str, item: dict) -> None:
"""Add Update message to kafka event stream.
Args:
item_id (str): The identifier of the item to delete.
item_id (str): The identifier of the item to partially update.
collection_id (str): The identifier of the collection that contains the item.
item (dict): The item data beiing patched.
Returns:
None
"""
logger.info("Deleting %s item", collection_id)
logger.info("Updating %s item", collection_id)

await revoke_item_soft(collection_id, item_id, item)
await partial_update_item(collection_id, item_id, item)

return None

0 comments on commit 8c5d068

Please sign in to comment.