Skip to content

Commit

Permalink
web-catalog: add version filter + docstrings (#242)
Browse files Browse the repository at this point in the history
  • Loading branch information
andersy005 authored Jun 29, 2022
1 parent 799de0e commit 127a719
Showing 1 changed file with 208 additions and 7 deletions.
215 changes: 208 additions & 7 deletions flows/catalogs/web_catalog.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import datetime
import functools
import itertools
import json
import traceback

import fsspec
import intake
import packaging.version
from prefect import Flow, Parameter, task
from upath import UPath

Expand All @@ -18,6 +20,20 @@


def get_license(source_id: str, derived_product: bool = False) -> dict:
"""
Get the license.
Parameters
----------
source_id : str
The source id.
derived_product : bool, optional
Whether the product is derived. The default is False.
Returns
-------
dict
The license."""

LICENSE_MAPPING = {
'ACCESS-CM2': '',
Expand Down Expand Up @@ -178,6 +194,23 @@ def parse_cmip6(store: str, root_path: str) -> dict[str, str]:

@task(log_stdout=True)
def get_cmip6_pyramids(paths: list[str], cdn: str, root_path: str) -> list[dict]:
"""
Get the pyramids for a list of CMIP6 stores.
Parameters
----------
paths : list[str]
List of CMIP6 stores.
cdn : str
CDN to use for the pyramids.
root_path : str
Root path for the pyramids.
Returns
-------
list[dict]
List of pyramids.
"""
import ecgtools

builder = ecgtools.Builder(
Expand All @@ -195,6 +228,26 @@ def parse_cmip6_downscaled_pyramid(
data, cdn: str, root_path: str, derived_product: bool = True
) -> list[dict]:

"""
Parse metadata for given CMIP6 downscaled pyramid.
Parameters
----------
data : str
Prefect run metadata.
cdn : str
CDN to use for the data.
root_path : str
Root path for the data.
derived_product : bool, optional
Whether the data is a derived product.
Defaults to True.
Returns
-------
list[dict]
List of dictionaries with the metadata for the data."""

parameters = data['parameters']
datasets = data['datasets']

Expand All @@ -218,6 +271,7 @@ def parse_cmip6_downscaled_pyramid(
**query,
'method': parameters['method'],
'license': get_license(query['source_id'], derived_product=derived_product),
'cmip6-downscaling-version': data.get('attrs', {}).get('version'),
}
results = []
if 'daily_pyramid_path' in datasets:
Expand Down Expand Up @@ -260,12 +314,127 @@ def parse_cmip6_downscaled_pyramid(
return results


def pick_latest_version(results: list[str]) -> list[str]:
"""
Pick the latest version.
Parameters
----------
results : list[str]
List of results.
Returns
-------
list[str]
List of results.
"""

def parse(path: str):
parts = path.split('/')
return {'version': packaging.version.Version(parts[-4]), 'param': parts[-2], 'run': path}

items = [parse(path) for path in results]
groups = itertools.groupby(items, key=lambda item: item['param'])
latest_results = []
for key, group in groups:
group = list(group)
latest = max(group, key=lambda item: item['version'])
latest_results.append(latest['run'])

print(f'{len(latest_results)} of {len(results)} results are kept')
return latest_results


def filter_version_results(
*, minimum_version: str, maximum_version: str, exclude_local_version: bool, results: list[str]
) -> list[str]:

"""
Filter the results by version.
Parameters
----------
minimum_version : str
The minimum version to keep.
maximum_version : str
The maximum version to keep.
exclude_local_version : bool
Whether to exclude the local version.
results : list[str]
The results to filter.
Returns
-------
list[str]
The filtered results.
"""
minimum_version = packaging.version.Version(minimum_version)
if maximum_version:
maximum_version = packaging.version.Version(maximum_version)

valid_results = []

for result in results:
version = result.split("/")[-4]
print(f"Checking version {version}")
version = packaging.version.Version(version)
if (
(maximum_version is None or version <= maximum_version)
and version >= minimum_version
and (not exclude_local_version or version.local is None)
):
valid_results.append(result)

return valid_results


@task(log_stdout=True)
def get_cmip6_downscaled_pyramids(path, cdn: str, root_path: str):
def get_cmip6_downscaled_pyramids(
*,
path,
cdn: str,
root_path: str,
minimum_version: str,
maximum_version: str = None,
exclude_local_version: bool = True,
):

"""
Get CMIP6 downscaled pyramids.
Parameters
----------
path : str
Path to the CMIP6 downscaled pyramids.
cdn : str
CDN URL.
root_path : str
Root path of the CMIP6 downscaled pyramids.
minimum_version : str
Minimum version of the cmip6-downscaling package.
maximum_version : str, optional
Maximum version of the cmip6-downscaling package. The default is None.
exclude_local_version : bool, optional
Exclude local version of the cmip6-downscaling package. The default is True.
Returns
-------
list[dict]
List of CMIP6 downscaled pyramids.
"""

mapper = fsspec.get_mapper(path)
fs = mapper.fs

latest_runs = fs.glob(path)
latest_runs = filter_version_results(
minimum_version=minimum_version,
maximum_version=maximum_version,
exclude_local_version=exclude_local_version,
results=latest_runs,
)
print(f"Found {len(latest_runs)} runs")
latest_runs = pick_latest_version(latest_runs)
datasets = []
for run in latest_runs:
with fs.open(run) as f:
Expand All @@ -283,11 +452,30 @@ def get_cmip6_downscaled_pyramids(path, cdn: str, root_path: str):
def create_catalog(
*,
catalog_path: str,
cmip6_raw_pyramids=None,
cmip6_downscaled_pyramids=None,
era5_pyramids=None,
cmip6_raw_pyramids: list[dict] = None,
cmip6_downscaled_pyramids: list[dict] = None,
era5_pyramids: list[dict] = None,
):

"""
Create catalog.
Parameters
----------
catalog_path : str
Path to the catalog.
cmip6_raw_pyramids : list[dict], optional
List of CMIP6 raw pyramids. The default is None.
cmip6_downscaled_pyramids : list[dict], optional
List of CMIP6 downscaled pyramids. The default is None.
era5_pyramids : list[dict], optional
List of ERA5 pyramids. The default is None.
Returns
-------
None
"""

datasets = []
if cmip6_raw_pyramids:
datasets += cmip6_raw_pyramids
Expand All @@ -310,6 +498,8 @@ def create_catalog(
json.dump(catalog, f, indent=2)

print(f'web-catalog located at: {catalog_path}')
print(f'{len(datasets)} datasets are included')
print(f'Sample datasets: {datasets[:5]}')


with Flow(
Expand All @@ -321,9 +511,9 @@ def create_catalog(
'web-catalog-path',
default='az://flow-outputs/results/pyramids/combined-cmip6-era5-pyramids-catalog-web.json',
)
downscaled_pyramids = Parameter(
downscaled_pyramids_path = Parameter(
'downscaled-pyramids-path',
default=r'az://flow-outputs/results/0.1.[^\d+$]/runs/*/latest.json',
default=r'az://flow-outputs/results/0.1.[^\d+$]*/runs/*/latest.json',
)

# https://cmip6downscaling.azureedge.net
Expand All @@ -332,7 +522,18 @@ def create_catalog(
root_path = Parameter('root-path', 'https://cmip6downscaling.blob.core.windows.net')

cmip6_raw_pyramids = get_cmip6_pyramids(paths, cdn, root_path)
cmip6_downscaled_pyramids = get_cmip6_downscaled_pyramids(downscaled_pyramids, cdn, root_path)
minimum_version = Parameter('minimum-version', default='0.1.9')
maximum_version = Parameter('maximum-version', default=None)
exclude_local_version = Parameter('exclude-local-version', default=True)

cmip6_downscaled_pyramids = get_cmip6_downscaled_pyramids(
path=downscaled_pyramids_path,
cdn=cdn,
root_path=root_path,
minimum_version=minimum_version,
maximum_version=maximum_version,
exclude_local_version=exclude_local_version,
)
create_catalog(
catalog_path=web_catalog_path,
cmip6_raw_pyramids=cmip6_raw_pyramids,
Expand Down

1 comment on commit 127a719

@vercel
Copy link

@vercel vercel bot commented on 127a719 Jun 29, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.