Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrades to RnR: deploying initial states and exception handing in the async framework #6

Merged
5 commits merged into from
Aug 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
1 change: 1 addition & 0 deletions .gitattributes
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,4 @@
*.tar.gz binary
*.gz binary
*.gpkg binary
*.nc binary
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ Source/RnR/.venv
Source/RnR/src/rnr/app/core/config.ini
Source/RnR/mock_db/rnr_schema_dump_20240612.dump
Source/RnR/data
Source/RnR/output
Source/RnR/dist

*.ruff_cache
*.pytest_cache
Expand Down
4 changes: 2 additions & 2 deletions Source/RnR/compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ services:
- "5432:5432"

hfsubset:
image: ghcr.io/taddyb33/hfsubset-legacy:0.0.1
image: ghcr.io/taddyb33/hfsubset-legacy:0.0.4
ports:
- "8008:8000"
volumes:
Expand All @@ -128,7 +128,7 @@ services:
- "6379:6379"

troute:
image: ghcr.io/taddyb33/t-route-dev:0.0.1
image: ghcr.io/taddyb33/t-route-dev:0.0.2
ports:
- "8004:8000"
volumes:
Expand Down
Empty file.
24 changes: 23 additions & 1 deletion Source/RnR/src/rnr/app/api/client/hfsubset.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,29 @@ async def async_subset(
) -> Dict[str, Any]:
endpoint = f"{base_url}/subset/"
params = {
"comid": feature_id,
"feature_id": feature_id,
"lyrs": [
"divides",
"nexus",
"flowpaths",
"lakes",
"flowpath_attributes",
"network",
"layer_styles",
],
}
return await async_get(endpoint, params)


async def async_downstream(
feature_id: str,
ds_feature_id: str,
base_url: str,
) -> Dict[str, Any]:
endpoint = f"{base_url}/downstream/"
params = {
"feature_id": feature_id,
"downstream_feature_id": ds_feature_id,
"lyrs": [
"divides",
"nexus",
Expand Down
6 changes: 5 additions & 1 deletion Source/RnR/src/rnr/app/api/client/troute.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,18 @@ def _get(
def run_troute(
lid: str,
feature_id: str,
mapped_feature_id: str,
start_time: str,
initial_start: float,
num_forecast_days: int,
base_url: str
) -> Dict[str, Any]:
endpoint = f"{base_url}/v4/flow_routing/"
endpoint = f"{base_url}/flow_routing/v4/"
params = {
"lid": lid,
"feature_id": feature_id,
"hy_id": mapped_feature_id,
"initial_start": initial_start,
"start_time": start_time,
"num_forecast_days": num_forecast_days
}
Expand Down
7 changes: 0 additions & 7 deletions Source/RnR/src/rnr/app/api/routes/publish.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,18 +45,13 @@ async def publish_single_message(
- summary: Summary of processing results
- results: Detailed results for each RFC entry
"""
# connection = start_connection(settings.pika_url)
# channel = start_work_queues(connection, settings)
# Since we are using an identifier, there will be one entry here
rfc_entry = RFCReaderService.get_rfc_data(db, identifier=lid).entries[
0
] # An RFCDatabaseEntries obj is always returned

tasks = [MessagePublisherService.process_rfc_entry(rfc_entry, settings)]
results = await asyncio.gather(*tasks)

# close_connection(connection)

summary = Summary(
total=len(results),
success=sum(1 for r in results if r.get("status") == "success"),
Expand Down Expand Up @@ -98,8 +93,6 @@ async def publish_messages(
- summary: Summary of processing results
- results: Detailed results for each RFC entry
"""
# connection = start_connection(settings.pika_url)
# channel = start_work_queues(connection, settings)
rfc_entries = RFCReaderService.get_rfc_data(
db
).entries # An RFCDatabaseEntries obj is always returned
Expand Down
73 changes: 67 additions & 6 deletions Source/RnR/src/rnr/app/api/routes/rfc.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
from typing import Annotated

from src.rnr.app.core.exceptions import NWPSAPIError
from src.rnr.app.api.services.nwps import NWPSService
from fastapi import APIRouter, BackgroundTasks, Depends
from pydantic import BaseModel
from pydantic import BaseModel, ValidationError
from sqlalchemy.orm import Session
import httpx

from src.rnr.app.api.client.hfsubset import async_subset, subset
from src.rnr.app.api.client.hfsubset import async_subset, subset, async_downstream
from src.rnr.app.api.database import get_db
from src.rnr.app.api.services.rfc import RFCReaderService
from src.rnr.app.core.cache import get_settings
Expand All @@ -16,7 +19,7 @@


class Message(BaseModel):
message: str = "Creating Hydrofabric Subsets in the background. Use /api/v1/rfc/build_rfc_hydrofabric_subset/{comid} to see if your LID has been processed"
message: str = "Creating Hydrofabric Subsets in the background. Use /api/v1/rfc/build_rfc_hydrofabric_subset/{feature_id} to see if your LID has been processed"


@router.get("/", response_model=RFCDatabaseEntries)
Expand Down Expand Up @@ -80,12 +83,55 @@ def build_single_rfc_location(
return subset_locations


@router.post("/build_rfc_hydrofabric_subsets/", response_model=Message)
async def build_rfc_locations(
# @router.post("/build_rfc_hydrofabric_subsets/", response_model=Message)
# async def build_rfc_locations(
# background_tasks: BackgroundTasks,
# settings: Annotated[Settings, Depends(get_settings)],
# db: Session = Depends(get_db),
# ) -> SubsetLocations:
# rfc_entries = RFCReaderService.get_rfc_data(
# db
# ).entries # An RFCDatabaseEntries obj is always returned

# limiter = AsyncRateLimiter(
# rate_limit=15, time_period=1
# ) # Setting a Rate Limit for Async Requests at 15 stations per second

# async def limited_process(entry):
# async with limiter:
# if entry.feature_id is not None:
# return await async_subset(entry.feature_id, settings.base_subset_url)
# else:
# print(
# f"{entry.nws_lid} does not have an attached feature ID. Cannot route"
# )

# [background_tasks.add_task(limited_process, rfc_entry) for rfc_entry in rfc_entries]
# return Message()


@router.post("/build_rfc_geopackages", response_model=Message)
async def build_rfc_domain(
background_tasks: BackgroundTasks,
settings: Annotated[Settings, Depends(get_settings)],
db: Session = Depends(get_db),
) -> SubsetLocations:
"""Builds a subset geopackage to learn the mapped feature_id (HY_ID) for a feature id. Then, gets the downstream RFC point's geofabric

Parameters:
-----------
background_tasks: BackgroundTasks
FastAPI background tasks
settings: Annotated[Settings, Depends(get_settings)]
pydantic settings
db: Session = Depends(get_db),
The database connection

Returns:
--------
Message:
BaseModel
"""
rfc_entries = RFCReaderService.get_rfc_data(
db
).entries # An RFCDatabaseEntries obj is always returned
Expand All @@ -97,11 +143,26 @@ async def build_rfc_locations(
async def limited_process(entry):
async with limiter:
if entry.feature_id is not None:
return await async_subset(entry.feature_id, settings.base_subset_url)
try:
gauge_data = await NWPSService.get_gauge_data(entry.nws_lid, settings)
rfc_ds = RFCReaderService.get_rfc_data(db, identifier=gauge_data.downstreamLid).entries[0]
_ = await async_subset(entry.feature_id, settings.base_subset_url)
return await async_downstream(entry.feature_id, rfc_ds.feature_id, settings.base_subset_url)
except ValidationError:
print(f"{entry.nws_lid} is not within the RFC Database. Cannot route")
return None
except httpx.HTTPStatusError:
print(f"Unprocessable lid: {entry.nws_lid}, downstream lid: {rfc_ds.nws_lid} 422 Error")
return None
except NWPSAPIError as e:
print(f"NWPSAPIError Unprocessable lid: {entry.nws_lid}, verify there is a downstream RFC station. {e.__str__()}")
return None

else:
print(
f"{entry.nws_lid} does not have an attached feature ID. Cannot route"
)
return None

[background_tasks.add_task(limited_process, rfc_entry) for rfc_entry in rfc_entries]
return Message()
11 changes: 11 additions & 0 deletions Source/RnR/src/rnr/app/api/services/nwps.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ async def get_gauge_product_forecast(
"""
api_url = settings.base_url
gauge_forecast = await gauge_product(identifier, api_url, "forecast")
gauge_observations = await gauge_product(identifier, api_url, "observed")

latest_observation_units = gauge_observations["secondaryUnits"]
latest_observation_flow = [gauge_observations["data"][-1]["secondary"]]

times = [
datetime.fromisoformat(entry["validTime"].rstrip("Z"))
for entry in gauge_forecast["data"]
Expand All @@ -82,11 +87,17 @@ async def get_gauge_product_forecast(
secondary_forecast, gauge_forecast["secondaryUnits"]
)

latest_observation_m3, latest_obs_units = convert_to_m3_per_sec(
latest_observation_flow, latest_observation_units
)

return GaugeForecast(
times=times,
primary_name=gauge_forecast["primaryName"],
primary_forecast=primary_forecast,
primary_unit=gauge_forecast["primaryUnits"],
latest_observation=latest_observation_m3,
latest_obs_units=latest_obs_units,
secondary_name=gauge_forecast["secondaryName"],
secondary_forecast=secondary_m3_forecast,
secondary_unit=secondary_units,
Expand Down
5 changes: 2 additions & 3 deletions Source/RnR/src/rnr/app/api/services/publish.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ class MessagePublisherService:
@staticmethod
async def process_rfc_entry(
rfc_entry: RFCDatabaseEntry,
# channel: pika.BlockingConnection.channel,
settings: Settings,
) -> Dict[str, str]:
"""
Expand Down Expand Up @@ -109,7 +108,6 @@ async def process_rfc_entry(
if not r_cache.exists(cache_key) or str(r_cache.get(cache_key)) != str(
cache_value
):
# print('value not in cache')
try:
await MessagePublisherService.process_and_publish_messages(
gauge_data, gauge_forecast, rfc_entry, settings
Expand Down Expand Up @@ -158,7 +156,6 @@ async def process_and_publish_messages(
"""
is_flood_observed = gauge_data.status.observed.floodCategory != "no_flooding"
is_flood_forecasted = gauge_data.status.forecast.floodCategory != "no_flooding"

processed_data = ProcessedData(
lid=gauge_data.lid,
upstream_lid=gauge_data.upstreamLid,
Expand All @@ -174,6 +171,8 @@ async def process_and_publish_messages(
timeZone=gauge_data.timeZone,
latitude=gauge_data.latitude,
longitude=gauge_data.longitude,
latest_observation=gauge_forecast.latest_observation,
latest_obs_units=gauge_forecast.latest_obs_units,
status=gauge_data.status,
times=gauge_forecast.times,
primary_name=gauge_forecast.primary_name,
Expand Down
Loading
Loading