Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TEMPO NO2 reader #188

Merged
merged 25 commits into from
Sep 9, 2024
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
e0c9e3b
TEMPO NO2 reader
blychs Aug 23, 2024
cd55cf3
Update monetio/sat/_tempo_l2_no2_mm.py
blychs Aug 25, 2024
b2fa316
Update monetio/sat/_tempo_l2_no2_mm.py
blychs Aug 25, 2024
5344ddc
Update monetio/sat/_tempo_l2_no2_mm.py
blychs Aug 25, 2024
69abb32
Fix bug with flags and add options for vars
blychs Aug 28, 2024
662c5d2
Debug min/max/quality_flag being NaN, decode time
blychs Aug 28, 2024
8c9f93a
Make "minimum" and "maximum" flags mask array
blychs Aug 28, 2024
b31b6f7
Better handle pressure, convert from hPa to Pa
blychs Aug 28, 2024
9d5c8a9
format with pre-commit hooks
blychs Aug 28, 2024
335fd7c
add TEMPO L2 NO2 test
blychs Aug 28, 2024
1098330
Fix always trying to find surface_pressure
blychs Aug 29, 2024
7107e8b
make sure test catches expected warning
blychs Aug 29, 2024
88d9593
Remove unnecesary print and warn statements
blychs Aug 29, 2024
fb4f571
remove lat and lon masking
blychs Aug 29, 2024
b1e7837
Remove (wrong) _FillValue guard
blychs Aug 29, 2024
35179de
Manage time whithoug xr.decode_cf()
blychs Aug 30, 2024
95a106d
Make more consistent with the TROPOMI reader
blychs Aug 30, 2024
6cd1fc9
Update tests/test_tempo_l2.py
blychs Aug 30, 2024
34ad51c
Fix bug in change of dim order for press
blychs Aug 30, 2024
91ab75b
Test also order of dimensions of pressure
blychs Aug 30, 2024
c3e1252
Fix order of __all__
blychs Aug 30, 2024
3152402
Update monetio/sat/_tempo_l2_no2_mm.py
blychs Aug 31, 2024
8b969b1
Merge branch 'feature/_tempo_l2_mm_only' of github.com:NCAR/monetio i…
blychs Aug 31, 2024
74fbc2d
Update monetio/sat/_tempo_l2_no2_mm.py
blychs Sep 5, 2024
4adc279
Update monetio/sat/_tempo_l2_no2_mm.py
blychs Sep 5, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions monetio/sat/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
_mopitt_l3_mm,
_omps_l3_mm,
_omps_nadir_mm,
_tempo_l2_no2_mm,
_tropomi_l2_no2_mm,
goes,
modis_ornl,
Expand All @@ -18,6 +19,7 @@
"_mopitt_l3_mm",
"_omps_l3_mm",
"_omps_nadir_mm",
"_tempo_l2_no2_mm",
"_tropomi_l2_no2_mm",
"goes",
"modis_ornl",
Expand Down
246 changes: 246 additions & 0 deletions monetio/sat/_tempo_l2_no2_mm.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,246 @@
"""TEMPO L2 NO2 data file reader.

History:


"""

import logging
import os
import sys
import warnings
from collections import OrderedDict
from glob import glob
from pathlib import Path

import numpy as np
import xarray as xr
from cftime import num2pydate
from netCDF4 import Dataset


def _open_one_dataset(fname, variable_dict):
"""Read locally stored INTELSAT TEMPO NO2 level 2
Parameters
blychs marked this conversation as resolved.
Show resolved Hide resolved
----------
fname : str
Local path to netCDF4 (HDF5) file.
variable_dict : dict

Returns
-------
ds : xr.Dataset
"""
print("reading " + fname)

ds = xr.Dataset()

dso = Dataset(fname, "r")
lon_var = dso.groups["geolocation"]["longitude"]
lat_var = dso.groups["geolocation"]["latitude"]
time_var = dso.groups["geolocation"]["time"]
time_units = time_var.units

ds["lon"] = (
("x", "y"),
lon_var[:].squeeze(),
{"long_name": lon_var.long_name, "units": lon_var.units},
)
ds["lat"] = (
("x", "y"),
lat_var[:].squeeze(),
{"long_name": lat_var.long_name, "units": lat_var.units},
)
ds["time"] = (
("time",),
num2pydate(time_var[:].squeeze(), time_units),
{"long_name": time_var.long_name},
)
ds = ds.set_coords(["time", "lon", "lat"])

ds.attrs["reference_time_string"] = dso.time_coverage_start
ds.attrs["granule_number"] = dso.granule_num
ds.attrs["scan_num"] = dso.scan_num

if ("pressure" in variable_dict) and "surface_pressure" not in variable_dict:
warnings.warn(
"Calculating pressure in TEMPO data requires surface_pressure. "
+ "Adding surface_pressure to output variables"
)
variable_dict["surface_pressure"] = {}

for varname in variable_dict:
if varname in [
"main_data_quality_flag",
"vertical_column_troposphere",
"vertical_column_stratosphere",
"vertical_column_troposphere_uncertainty",
]:
values_var = dso.groups["product"][varname]
elif varname in [
"latitude_bounds",
"longitude_bounds",
"solar_zenith_angle",
"solar_azimuth_angle",
"viewing_zenith_angle",
"viewing_azimuth_angle",
"relative_azimuth_angle",
]:
values_var = dso.groups["geolocation"][varname]
elif varname in [
"vertical_column_total",
"vertical_column_total_uncertainty",
"fitted_slant_column",
"fitted_slant_column_uncertainty",
"snow_ice_fraction",
"terrain_height",
"ground_pixel_quality_flag",
"surface_pressure",
"tropopause_pressure",
"scattering_weights",
"gas_profile",
"albedo",
"temperature_profile",
"amf_total",
"amf_diagnositc_flag",
"eff_cloud_fraction",
"amf_cloud_fraction",
"amf_cloud_pressure",
"amf_troposphere",
"amf_stratosphere",
]:
values_var = dso.groups["support_data"][varname]
elif varname in ["fit_rms_residual", "fit_convergence_flag"]:
values_var = dso.groups["qa_statistics"][varname]
values = values_var[:].squeeze()

if "scale" in variable_dict[varname]:
values[:] = variable_dict[varname]["scale"] * values[:]

if "minimum" in variable_dict[varname]:
minimum = variable_dict[varname]["minimum"]
values = np.ma.masked_less(values, minimum, copy=False)

if "maximum" in variable_dict[varname]:
maximum = variable_dict[varname]["maximum"]
values = np.ma.masked_greater(values, maximum, copy=False)

if "corner" in values_var.dimensions:
ds[varname] = (("x", "y", "corner"), values, values_var.__dict__)
elif "swt_level" in values_var.dimensions:
ds[varname] = (("x", "y", "swt_level"), values, values_var.__dict__)
else:
ds[varname] = (("x", "y"), values, values_var.__dict__)

if "quality_flag_max" in variable_dict[varname]:
ds.attrs["quality_flag"] = varname
ds.attrs["quality_thresh_max"] = variable_dict[varname]["quality_flag_max"]

dso.close()

if "surface_pressure" in variable_dict:
if ds["surface_pressure"].attrs["units"] == "hPa":
HPA2PA = 100
ds["surface_pressure"][:] = ds["surface_pressure"].values * HPA2PA
ds["surface_pressure"].attrs["units"] = "Pa"
ds["surface_pressure"].attrs["valid_min"] *= HPA2PA
ds["surface_pressure"].attrs["valid_max"] *= HPA2PA
ds["surface_pressure"].attrs["Eta_A"] *= HPA2PA
if "pressure" in variable_dict:
ds["pressure"] = calculate_pressure(ds)

return ds


def calculate_pressure(ds):
"""Calculates pressure at layer and delta_pressure of layer

Parameters
----------
ds : xr.Dataset

Returns
-------
layer_pressure: xr.DataArray
Pressure at layer in Pa
delta_pressure: xr.DataArray
blychs marked this conversation as resolved.
Show resolved Hide resolved
Difference of pressure in layer
"""
surf_pressure = ds["surface_pressure"]
eta_a = surf_pressure.Eta_A
eta_b = surf_pressure.Eta_B
n_layers = len(surf_pressure.Eta_A)
press = np.zeros((n_layers, surf_pressure.shape[0], surf_pressure.shape[1]))
for k in range(0, n_layers):
press[k, :, :] = eta_a[k] + eta_b[k] * surf_pressure.values
pressure = xr.DataArray(
data=press,
dims=("swt_level_stagg", "x", "y"),
coords={
"lon": (["x", "y"], surf_pressure.lon.values),
"lat": (["x", "y"], surf_pressure.lat.values),
},
attrs={
"long_name": "pressure",
"units": surf_pressure.attrs["units"],
"valid_min": surf_pressure.attrs["valid_min"],
"valid_max": surf_pressure.attrs["valid_max"],
"algorithm": "Calculated from hybrid coords as Eta_A + Eta_B * surface_pressure",
"_FillValue": np.nan,
},
)
return pressure


def apply_quality_flag(ds):
"""Mask variables in place based on quality flag

Parameters
----------
ds : xr.Dataset
"""
if "quality_flag" in ds.attrs:
quality_flag = ds[ds.attrs["quality_flag"]]
quality_thresh_max = ds.attrs["quality_thresh_max"]

# Apply the quality thersh maximum to all variables in ds
for varname in ds:
if varname != ds.attrs["quality_flag"]:
logging.debug(varname)
ds[varname] = ds[varname].where(~(quality_flag > quality_thresh_max))


def open_dataset(fnames, variable_dict, debug=False):
if debug:
logging_level = logging.DEBUG
logging.basicConfig(stream=sys.stdout, level=logging_level)

if isinstance(fnames, Path):
fnames = fnames.as_posix()
if isinstance(variable_dict, str):
variable_dict = {variable_dict: {}}
elif isinstance(variable_dict, dict):
pass
else: # Assume sequence
variable_dict = {varname: {} for varname in variable_dict}

for subpath in fnames.split("/"):
if "$" in subpath:
envvar = subpath.replace("$", "")
envval = os.getenv(envvar)
if envval is None:
raise Exception("Environment variable not defined: " + subpath)
else:
fnames = fnames.replace(subpath, envval)

print(fnames)
files = sorted(glob(fnames))

granules = OrderedDict()

for file in files:
granule = _open_one_dataset(file, variable_dict)
apply_quality_flag(granule)
granules[granule.attrs["reference_time_string"]] = granule

return granules
86 changes: 86 additions & 0 deletions tests/test_tempo_l2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
import shutil
import warnings
from pathlib import Path

import pytest
from filelock import FileLock

from monetio.sat._tempo_l2_no2_mm import open_dataset

HERE = Path(__file__).parent


def retrieve_test_file():
fn = "sample_TEMPO_NO2_L2_V03_20240826T204005Z_S012G01.nc"

p = HERE / "data" / fn

if not p.is_file():
warnings.warn(f"Downloading test file {fn} for TEMPO NO2 L2 test")

import requests

r = requests.get(
"https://csl.noaa.gov/groups/csl4/modeldata/melodies-monet/data/"
f"example_observation_data/satellite/{fn}",
stream=True,
)
r.raise_for_status()
with open(p, "wb") as f:
f.write(r.content)

return p


@pytest.fixture(scope="module")
def test_file_path(tmp_path_factory, worker_id):
if worker_id == "master":
# Not executing with multiple workers;
# let pytest's fixture caching do its job
return retrieve_test_file()

# Get the temp directory shared by all workers
root_tmp_dir = tmp_path_factory.getbasetemp().parent

# Copy to the shared test location
p_test = root_tmp_dir / "tempo_l2_test.nc"

with FileLock(p_test.as_posix() + ".lock"):
if p_test.is_file():
return p_test
else:
p = retrieve_test_file()
shutil.copy(p, p_test)
return p_test


def test_open_dataset(test_file_path):
vn = "vertical_column_troposphere"
t_ref = "2024-08-26T20:40:05Z"
ds = open_dataset(test_file_path, {vn: {}})[t_ref]

assert set(ds.coords) == {"time", "lat", "lon"}
assert set(ds) == {vn}
assert set(ds.attrs) == {"granule_number", "reference_time_string", "scan_num"}

with pytest.warns(
UserWarning,
match=(
"Calculating pressure in TEMPO data requires surface_pressure. "
+ "Adding surface_pressure to output variables"
),
):
ds2 = open_dataset(
test_file_path,
{vn: {}, "main_data_quality_flag": {"quality_flag_max": 0}, "pressure": {}},
)[t_ref]
assert set(ds2.variables) == {
"lat",
"lon",
"main_data_quality_flag",
"pressure",
"surface_pressure",
"time",
"vertical_column_troposphere",
}
assert ds2["pressure"].dims == ("swt_level_stagg", "x", "y")