diff --git a/monetio/sat/__init__.py b/monetio/sat/__init__.py index 2ceb4de9..adbe2029 100644 --- a/monetio/sat/__init__.py +++ b/monetio/sat/__init__.py @@ -4,6 +4,7 @@ _mopitt_l3_mm, _omps_l3_mm, _omps_nadir_mm, + _tempo_l2_no2_mm, _tropomi_l2_no2_mm, goes, modis_ornl, @@ -18,6 +19,7 @@ "_mopitt_l3_mm", "_omps_l3_mm", "_omps_nadir_mm", + "_tempo_l2_no2_mm", "_tropomi_l2_no2_mm", "goes", "modis_ornl", diff --git a/monetio/sat/_tempo_l2_no2_mm.py b/monetio/sat/_tempo_l2_no2_mm.py new file mode 100644 index 00000000..7336b89f --- /dev/null +++ b/monetio/sat/_tempo_l2_no2_mm.py @@ -0,0 +1,247 @@ +"""TEMPO L2 NO2 data file reader. + +History: + + +""" + +import logging +import os +import sys +import warnings +from collections import OrderedDict +from glob import glob +from pathlib import Path + +import numpy as np +import xarray as xr +from cftime import num2pydate +from netCDF4 import Dataset + + +def _open_one_dataset(fname, variable_dict): + """Read locally stored INTELSAT TEMPO NO2 level 2 + + Parameters + ---------- + fname : str + Local path to netCDF4 (HDF5) file. + variable_dict : dict + + Returns + ------- + ds : xr.Dataset + """ + print("reading " + fname) + + ds = xr.Dataset() + + dso = Dataset(fname, "r") + lon_var = dso.groups["geolocation"]["longitude"] + lat_var = dso.groups["geolocation"]["latitude"] + time_var = dso.groups["geolocation"]["time"] + time_units = time_var.units + + ds["lon"] = ( + ("x", "y"), + lon_var[:].squeeze(), + {"long_name": lon_var.long_name, "units": lon_var.units}, + ) + ds["lat"] = ( + ("x", "y"), + lat_var[:].squeeze(), + {"long_name": lat_var.long_name, "units": lat_var.units}, + ) + ds["time"] = ( + ("time",), + num2pydate(time_var[:].squeeze(), time_units), + {"long_name": time_var.long_name}, + ) + ds = ds.set_coords(["time", "lon", "lat"]) + + ds.attrs["reference_time_string"] = dso.time_coverage_start + ds.attrs["granule_number"] = dso.granule_num + ds.attrs["scan_num"] = dso.scan_num + + if ("pressure" in variable_dict) and "surface_pressure" not in variable_dict: + warnings.warn( + "Calculating pressure in TEMPO data requires surface_pressure. " + + "Adding surface_pressure to output variables" + ) + variable_dict["surface_pressure"] = {} + + for varname in variable_dict: + if varname in [ + "main_data_quality_flag", + "vertical_column_troposphere", + "vertical_column_stratosphere", + "vertical_column_troposphere_uncertainty", + ]: + values_var = dso.groups["product"][varname] + elif varname in [ + "latitude_bounds", + "longitude_bounds", + "solar_zenith_angle", + "solar_azimuth_angle", + "viewing_zenith_angle", + "viewing_azimuth_angle", + "relative_azimuth_angle", + ]: + values_var = dso.groups["geolocation"][varname] + elif varname in [ + "vertical_column_total", + "vertical_column_total_uncertainty", + "fitted_slant_column", + "fitted_slant_column_uncertainty", + "snow_ice_fraction", + "terrain_height", + "ground_pixel_quality_flag", + "surface_pressure", + "tropopause_pressure", + "scattering_weights", + "gas_profile", + "albedo", + "temperature_profile", + "amf_total", + "amf_diagnositc_flag", + "eff_cloud_fraction", + "amf_cloud_fraction", + "amf_cloud_pressure", + "amf_troposphere", + "amf_stratosphere", + ]: + values_var = dso.groups["support_data"][varname] + elif varname in ["fit_rms_residual", "fit_convergence_flag"]: + values_var = dso.groups["qa_statistics"][varname] + values = values_var[:].squeeze() + + if "scale" in variable_dict[varname]: + values[:] = variable_dict[varname]["scale"] * values[:] + + if "minimum" in variable_dict[varname]: + minimum = variable_dict[varname]["minimum"] + values = np.ma.masked_less(values, minimum, copy=False) + + if "maximum" in variable_dict[varname]: + maximum = variable_dict[varname]["maximum"] + values = np.ma.masked_greater(values, maximum, copy=False) + + if "corner" in values_var.dimensions: + ds[varname] = (("x", "y", "corner"), values, values_var.__dict__) + elif "swt_level" in values_var.dimensions: + ds[varname] = (("x", "y", "swt_level"), values, values_var.__dict__) + else: + ds[varname] = (("x", "y"), values, values_var.__dict__) + + if "quality_flag_max" in variable_dict[varname]: + ds.attrs["quality_flag"] = varname + ds.attrs["quality_thresh_max"] = variable_dict[varname]["quality_flag_max"] + + dso.close() + + if "surface_pressure" in variable_dict: + if ds["surface_pressure"].attrs["units"] == "hPa": + HPA2PA = 100 + ds["surface_pressure"][:] = ds["surface_pressure"].values * HPA2PA + ds["surface_pressure"].attrs["units"] = "Pa" + ds["surface_pressure"].attrs["valid_min"] *= HPA2PA + ds["surface_pressure"].attrs["valid_max"] *= HPA2PA + ds["surface_pressure"].attrs["Eta_A"] *= HPA2PA + if "pressure" in variable_dict: + ds["pressure"] = calculate_pressure(ds) + + return ds + + +def calculate_pressure(ds): + """Calculates pressure at layer and delta_pressure of layer + + Parameters + ---------- + ds : xr.Dataset + + Returns + ------- + layer_pressure : xr.DataArray + Pressure at layer in Pa + delta_pressure : xr.DataArray + Difference of pressure in layer + """ + surf_pressure = ds["surface_pressure"] + eta_a = surf_pressure.Eta_A + eta_b = surf_pressure.Eta_B + n_layers = len(surf_pressure.Eta_A) + press = np.zeros((n_layers, surf_pressure.shape[0], surf_pressure.shape[1])) + for k in range(0, n_layers): + press[k, :, :] = eta_a[k] + eta_b[k] * surf_pressure.values + pressure = xr.DataArray( + data=press, + dims=("swt_level_stagg", "x", "y"), + coords={ + "lon": (["x", "y"], surf_pressure.lon.values), + "lat": (["x", "y"], surf_pressure.lat.values), + }, + attrs={ + "long_name": "pressure", + "units": surf_pressure.attrs["units"], + "valid_min": surf_pressure.attrs["valid_min"], + "valid_max": surf_pressure.attrs["valid_max"], + "algorithm": "Calculated from hybrid coords as Eta_A + Eta_B * surface_pressure", + "_FillValue": np.nan, + }, + ) + return pressure + + +def apply_quality_flag(ds): + """Mask variables in place based on quality flag + + Parameters + ---------- + ds : xr.Dataset + """ + if "quality_flag" in ds.attrs: + quality_flag = ds[ds.attrs["quality_flag"]] + quality_thresh_max = ds.attrs["quality_thresh_max"] + + # Apply the quality thersh maximum to all variables in ds + for varname in ds: + if varname != ds.attrs["quality_flag"]: + logging.debug(varname) + ds[varname] = ds[varname].where(~(quality_flag > quality_thresh_max)) + + +def open_dataset(fnames, variable_dict, debug=False): + if debug: + logging_level = logging.DEBUG + logging.basicConfig(stream=sys.stdout, level=logging_level) + + if isinstance(fnames, Path): + fnames = fnames.as_posix() + if isinstance(variable_dict, str): + variable_dict = {variable_dict: {}} + elif isinstance(variable_dict, dict): + pass + else: # Assume sequence + variable_dict = {varname: {} for varname in variable_dict} + + for subpath in fnames.split("/"): + if "$" in subpath: + envvar = subpath.replace("$", "") + envval = os.getenv(envvar) + if envval is None: + raise Exception("Environment variable not defined: " + subpath) + else: + fnames = fnames.replace(subpath, envval) + + print(fnames) + files = sorted(glob(fnames)) + + granules = OrderedDict() + + for file in files: + granule = _open_one_dataset(file, variable_dict) + apply_quality_flag(granule) + granules[granule.attrs["reference_time_string"]] = granule + + return granules diff --git a/tests/test_tempo_l2.py b/tests/test_tempo_l2.py new file mode 100644 index 00000000..9f2d7caf --- /dev/null +++ b/tests/test_tempo_l2.py @@ -0,0 +1,86 @@ +import shutil +import warnings +from pathlib import Path + +import pytest +from filelock import FileLock + +from monetio.sat._tempo_l2_no2_mm import open_dataset + +HERE = Path(__file__).parent + + +def retrieve_test_file(): + fn = "sample_TEMPO_NO2_L2_V03_20240826T204005Z_S012G01.nc" + + p = HERE / "data" / fn + + if not p.is_file(): + warnings.warn(f"Downloading test file {fn} for TEMPO NO2 L2 test") + + import requests + + r = requests.get( + "https://csl.noaa.gov/groups/csl4/modeldata/melodies-monet/data/" + f"example_observation_data/satellite/{fn}", + stream=True, + ) + r.raise_for_status() + with open(p, "wb") as f: + f.write(r.content) + + return p + + +@pytest.fixture(scope="module") +def test_file_path(tmp_path_factory, worker_id): + if worker_id == "master": + # Not executing with multiple workers; + # let pytest's fixture caching do its job + return retrieve_test_file() + + # Get the temp directory shared by all workers + root_tmp_dir = tmp_path_factory.getbasetemp().parent + + # Copy to the shared test location + p_test = root_tmp_dir / "tempo_l2_test.nc" + + with FileLock(p_test.as_posix() + ".lock"): + if p_test.is_file(): + return p_test + else: + p = retrieve_test_file() + shutil.copy(p, p_test) + return p_test + + +def test_open_dataset(test_file_path): + vn = "vertical_column_troposphere" + t_ref = "2024-08-26T20:40:05Z" + ds = open_dataset(test_file_path, {vn: {}})[t_ref] + + assert set(ds.coords) == {"time", "lat", "lon"} + assert set(ds) == {vn} + assert set(ds.attrs) == {"granule_number", "reference_time_string", "scan_num"} + + with pytest.warns( + UserWarning, + match=( + "Calculating pressure in TEMPO data requires surface_pressure. " + + "Adding surface_pressure to output variables" + ), + ): + ds2 = open_dataset( + test_file_path, + {vn: {}, "main_data_quality_flag": {"quality_flag_max": 0}, "pressure": {}}, + )[t_ref] + assert set(ds2.variables) == { + "lat", + "lon", + "main_data_quality_flag", + "pressure", + "surface_pressure", + "time", + "vertical_column_troposphere", + } + assert ds2["pressure"].dims == ("swt_level_stagg", "x", "y")