Skip to content

Commit

Permalink
Update gdp datasets (fix) (#354)
Browse files Browse the repository at this point in the history
Update gdp datasets; centralize download logic and enhance with retry mechanism.

closes #353
  • Loading branch information
kevinsantana11 authored and Philippe Miron committed Jan 20, 2024
1 parent e66db2d commit c438051
Show file tree
Hide file tree
Showing 15 changed files with 286 additions and 205 deletions.
27 changes: 20 additions & 7 deletions clouddrift/adapters/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,23 @@
in the future.
"""

import clouddrift.adapters.andro
import clouddrift.adapters.gdp1h
import clouddrift.adapters.gdp6h
import clouddrift.adapters.glad
import clouddrift.adapters.mosaic
import clouddrift.adapters.subsurface_floats
import clouddrift.adapters.yomaha
import clouddrift.adapters.andro as andro
import clouddrift.adapters.gdp1h as gdp1h
import clouddrift.adapters.gdp6h as gdp6h
import clouddrift.adapters.glad as glad
import clouddrift.adapters.mosaic as mosaic
import clouddrift.adapters.subsurface_floats as subsurface_floats
import clouddrift.adapters.yomaha as yomaha
import clouddrift.adapters.utils as utils


__all__ = [
"andro",
"gdp1h",
"gdp6h",
"glad",
"mosaic",
"subsurface_floats",
"yomaha",
"utils",
]
4 changes: 2 additions & 2 deletions clouddrift/adapters/andro.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
SEANOE. https://doi.org/10.17882/47077
"""

from clouddrift.adapters.yomaha import download_with_progress
from clouddrift.adapters.utils import download_with_progress
from datetime import datetime
import numpy as np
import os
Expand All @@ -39,7 +39,7 @@ def to_xarray(tmp_path: str = None):

# get or update dataset
local_file = f"{tmp_path}/{ANDRO_URL.split('/')[-1]}"
download_with_progress(ANDRO_URL, local_file)
download_with_progress([(ANDRO_URL, local_file)])

# parse with panda
col_names = [
Expand Down
8 changes: 2 additions & 6 deletions clouddrift/adapters/gdp.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,11 @@
and six-hourly (``clouddrift.adapters.gdp6h``) GDP modules.
"""

from clouddrift.adapters.utils import download_with_progress
import numpy as np
import os
import pandas as pd
import xarray as xr
import urllib.request
import warnings

GDP_COORDS = [
"ids",
Expand Down Expand Up @@ -188,10 +187,7 @@ def fetch_netcdf(url: str, file: str):
file : str
Name of the file to save.
"""
if not os.path.isfile(file):
urllib.request.urlretrieve(url, file)
else:
warnings.warn(f"{file} already exists; skip download.")
download_with_progress([(url, file)])

Check warning on line 190 in clouddrift/adapters/gdp.py

View check run for this annotation

Codecov / codecov/patch

clouddrift/adapters/gdp.py#L190

Added line #L190 was not covered by tests


def decode_date(t):
Expand Down
45 changes: 21 additions & 24 deletions clouddrift/adapters/gdp1h.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,20 @@

import clouddrift.adapters.gdp as gdp
from clouddrift.raggedarray import RaggedArray
from datetime import datetime
from clouddrift.adapters.utils import download_with_progress
from datetime import datetime, timedelta
import numpy as np
import urllib.request
import concurrent.futures
import re
import tempfile
from tqdm import tqdm
from typing import Optional
import os
import warnings
import xarray as xr

GDP_VERSION = "2.01"

GDP_DATA_URL = "https://www.aoml.noaa.gov/ftp/pub/phod/lumpkin/hourly/v2.01/netcdf/"
GDP_DATA_URL = "https://www.aoml.noaa.gov/ftp/pub/phod/buoydata/hourly_product/v2.01/"
GDP_DATA_URL_EXPERIMENTAL = (
"https://www.aoml.noaa.gov/ftp/pub/phod/lumpkin/hourly/experimental/"
)
Expand Down Expand Up @@ -108,25 +107,11 @@ def download(
rng = np.random.RandomState(42)
drifter_ids = sorted(rng.choice(drifter_ids, n_random_id, replace=False))

with concurrent.futures.ThreadPoolExecutor() as executor:
# create list of urls and paths
urls = []
files = []
for i in drifter_ids:
file = filename_pattern.format(id=i)
urls.append(os.path.join(url, file))
files.append(os.path.join(tmp_path, file))

# parallel retrieving of individual netCDF files
list(
tqdm(
executor.map(gdp.fetch_netcdf, urls, files),
total=len(files),
desc="Downloading files",
ncols=80,
)
)

download_requests = [

Check warning on line 110 in clouddrift/adapters/gdp1h.py

View check run for this annotation

Codecov / codecov/patch

clouddrift/adapters/gdp1h.py#L110

Added line #L110 was not covered by tests
(os.path.join(url, file_name), os.path.join(tmp_path, file_name))
for file_name in map(lambda d_id: filename_pattern.format(id=d_id), drifter_ids)
]
download_with_progress(download_requests)

Check warning on line 114 in clouddrift/adapters/gdp1h.py

View check run for this annotation

Codecov / codecov/patch

clouddrift/adapters/gdp1h.py#L114

Added line #L114 was not covered by tests
# Download the metadata so we can order the drifter IDs by end date.
gdp_metadata = gdp.get_gdp_metadata()

Expand Down Expand Up @@ -490,6 +475,8 @@ def preprocess(index: int, **kwargs) -> xr.Dataset:
"title": "Global Drifter Program hourly drifting buoy collection",
"history": f"version {GDP_VERSION}. Metadata from dirall.dat and deplog.dat",
"Conventions": "CF-1.6",
"time_coverage_start": "",
"time_coverage_end": "",
"date_created": datetime.now().isoformat(),
"publisher_name": "GDP Drifter DAC",
"publisher_email": "aoml.dftr@noaa.gov",
Expand Down Expand Up @@ -602,7 +589,7 @@ def to_raggedarray(
else:
raise ValueError(f"url must be {GDP_DATA_URL} or {GDP_DATA_URL_EXPERIMENTAL}.")

return RaggedArray.from_files(
ra = RaggedArray.from_files(

Check warning on line 592 in clouddrift/adapters/gdp1h.py

View check run for this annotation

Codecov / codecov/patch

clouddrift/adapters/gdp1h.py#L592

Added line #L592 was not covered by tests
indices=ids,
preprocess_func=preprocess,
name_coords=gdp.GDP_COORDS,
Expand All @@ -612,3 +599,13 @@ def to_raggedarray(
filename_pattern=filename_pattern,
tmp_path=tmp_path,
)

# set dynamic global attributes
ra.attrs_global[

Check warning on line 604 in clouddrift/adapters/gdp1h.py

View check run for this annotation

Codecov / codecov/patch

clouddrift/adapters/gdp1h.py#L604

Added line #L604 was not covered by tests
"time_coverage_start"
] = f"{datetime(1970,1,1) + timedelta(seconds=int(np.min(ra.coords['time']))):%Y-%m-%d:%H:%M:%SZ}"
ra.attrs_global[

Check warning on line 607 in clouddrift/adapters/gdp1h.py

View check run for this annotation

Codecov / codecov/patch

clouddrift/adapters/gdp1h.py#L607

Added line #L607 was not covered by tests
"time_coverage_end"
] = f"{datetime(1970,1,1) + timedelta(seconds=int(np.max(ra.coords['time']))):%Y-%m-%d:%H:%M:%SZ}"

return ra

Check warning on line 611 in clouddrift/adapters/gdp1h.py

View check run for this annotation

Codecov / codecov/patch

clouddrift/adapters/gdp1h.py#L611

Added line #L611 was not covered by tests
59 changes: 30 additions & 29 deletions clouddrift/adapters/gdp6h.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,21 @@
"""

import clouddrift.adapters.gdp as gdp
from clouddrift.adapters.utils import download_with_progress
from clouddrift.raggedarray import RaggedArray
from datetime import datetime
from datetime import datetime, timedelta
import numpy as np
import urllib.request
import concurrent.futures
import re
import tempfile
from tqdm import tqdm
from typing import Optional
import os
import warnings
import xarray as xr

GDP_VERSION = "September 2023"

GDP_DATA_URL = "https://www.aoml.noaa.gov/ftp/pub/phod/lumpkin/netcdf/"
GDP_DATA_URL = "https://www.aoml.noaa.gov/ftp/pub/phod/buoydata/6h/"
GDP_TMP_PATH = os.path.join(tempfile.gettempdir(), "clouddrift", "gdp6h")
GDP_DATA = [
"lon",
Expand Down Expand Up @@ -57,20 +57,20 @@ def download(
Returns
-------
out : list
List of retrived drifters
List of retrieved drifters
"""

print(f"Downloading GDP 6-hourly data to {tmp_path}...")

# Create a temporary directory if doesn't already exists.
os.makedirs(tmp_path, exist_ok=True)

pattern = "drifter_[0-9]*.nc"
pattern = "drifter_6h_[0-9]*.nc"

Check warning on line 68 in clouddrift/adapters/gdp6h.py

View check run for this annotation

Codecov / codecov/patch

clouddrift/adapters/gdp6h.py#L68

Added line #L68 was not covered by tests
directory_list = [
"buoydata_1_5000",
"buoydata_5001_10000",
"buoydata_10001_15000",
"buoydata_15001_oct22",
"netcdf_1_5000",
"netcdf_5001_10000",
"netcdf_10001_15000",
"netcdf_15001_current",
]

# retrieve all drifter ID numbers
Expand All @@ -94,25 +94,14 @@ def download(
rng = np.random.RandomState(42)
drifter_urls = rng.choice(drifter_urls, n_random_id, replace=False)

with concurrent.futures.ThreadPoolExecutor() as executor:
# Asynchronously download individual netCDF files
list(
tqdm(
executor.map(
gdp.fetch_netcdf,
drifter_urls,
[os.path.join(tmp_path, os.path.basename(f)) for f in drifter_urls],
),
total=len(drifter_urls),
desc="Downloading files",
ncols=80,
)
)
download_with_progress(

Check warning on line 97 in clouddrift/adapters/gdp6h.py

View check run for this annotation

Codecov / codecov/patch

clouddrift/adapters/gdp6h.py#L97

Added line #L97 was not covered by tests
[(url, os.path.join(tmp_path, os.path.basename(url))) for url in drifter_urls]
)

# Download the metadata so we can order the drifter IDs by end date.
gdp_metadata = gdp.get_gdp_metadata()
drifter_ids = [
int(os.path.basename(f).split("_")[1].split(".")[0]) for f in drifter_urls
int(os.path.basename(f).split("_")[2].split(".")[0]) for f in drifter_urls
]

return gdp.order_by_date(gdp_metadata, drifter_ids)
Expand Down Expand Up @@ -392,9 +381,11 @@ def preprocess(index: int, **kwargs) -> xr.Dataset:

# global attributes
attrs = {
"title": "Global Drifter Program hourly drifting buoy collection",
"history": f"version {gdp.GDP_VERSION}. Metadata from dirall.dat and deplog.dat",
"title": "Global Drifter Program drifting buoy collection",
"history": f"version {GDP_VERSION}. Metadata from dirall.dat and deplog.dat",
"Conventions": "CF-1.6",
"time_coverage_start": "",
"time_coverage_end": "",
"date_created": datetime.now().isoformat(),
"publisher_name": "GDP Drifter DAC",
"publisher_email": "aoml.dftr@noaa.gov",
Expand Down Expand Up @@ -485,13 +476,23 @@ def to_raggedarray(
"""
ids = download(drifter_ids, n_random_id, GDP_DATA_URL, tmp_path)

return RaggedArray.from_files(
ra = RaggedArray.from_files(

Check warning on line 479 in clouddrift/adapters/gdp6h.py

View check run for this annotation

Codecov / codecov/patch

clouddrift/adapters/gdp6h.py#L479

Added line #L479 was not covered by tests
indices=ids,
preprocess_func=preprocess,
name_coords=gdp.GDP_COORDS,
name_meta=gdp.GDP_METADATA,
name_data=GDP_DATA,
rowsize_func=gdp.rowsize,
filename_pattern="drifter_{id}.nc",
filename_pattern="drifter_6h_{id}.nc",
tmp_path=tmp_path,
)

# update dynamic global attributes
ra.attrs_global[

Check warning on line 491 in clouddrift/adapters/gdp6h.py

View check run for this annotation

Codecov / codecov/patch

clouddrift/adapters/gdp6h.py#L491

Added line #L491 was not covered by tests
"time_coverage_start"
] = f"{datetime(1970,1,1) + timedelta(seconds=int(np.min(ra.coords['time']))):%Y-%m-%d:%H:%M:%SZ}"
ra.attrs_global[

Check warning on line 494 in clouddrift/adapters/gdp6h.py

View check run for this annotation

Codecov / codecov/patch

clouddrift/adapters/gdp6h.py#L494

Added line #L494 was not covered by tests
"time_coverage_end"
] = f"{datetime(1970,1,1) + timedelta(seconds=int(np.max(ra.coords['time']))):%Y-%m-%d:%H:%M:%SZ}"

return ra

Check warning on line 498 in clouddrift/adapters/gdp6h.py

View check run for this annotation

Codecov / codecov/patch

clouddrift/adapters/gdp6h.py#L498

Added line #L498 was not covered by tests
15 changes: 4 additions & 11 deletions clouddrift/adapters/glad.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,10 @@
---------
Özgökmen, Tamay. 2013. GLAD experiment CODE-style drifter trajectories (low-pass filtered, 15 minute interval records), northern Gulf of Mexico near DeSoto Canyon, July-October 2012. Distributed by: Gulf of Mexico Research Initiative Information and Data Cooperative (GRIIDC), Harte Research Institute, Texas A&M University–Corpus Christi. doi:10.7266/N7VD6WC8
"""
from io import StringIO
from clouddrift.adapters.utils import download_with_progress
from io import BytesIO
import numpy as np
import pandas as pd
import requests
import tqdm
import xarray as xr


Expand All @@ -27,15 +26,9 @@ def get_dataframe() -> pd.DataFrame:
# GRIIDC server doesn't provide Content-Length header, so we'll hardcode
# the expected data length here.
file_size = 155330876
r = requests.get(url, stream=True)
progress_bar = tqdm.tqdm(total=file_size, unit="iB", unit_scale=True)
buf = StringIO()
for chunk in r.iter_content(chunk_size=1024):
if chunk: # filter out keep-alive new chunks
buf.write(chunk.decode("utf-8"))
progress_bar.update(len(chunk))
buf = BytesIO(b"")
download_with_progress([(url, buf)])
buf.seek(0)
progress_bar.close()
column_names = [
"id",
"date",
Expand Down
15 changes: 6 additions & 9 deletions clouddrift/adapters/mosaic.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,17 @@
>>> from clouddrift.adapters import mosaic
>>> ds = mosaic.to_xarray()
"""
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime
from io import BytesIO
import numpy as np
import pandas as pd
import requests
from tqdm import tqdm
import xarray as xr
import xml.etree.ElementTree as ET

from clouddrift.adapters.utils import download_with_progress

MOSAIC_VERSION = "2022"


Expand Down Expand Up @@ -56,15 +58,10 @@ def get_dataframes() -> tuple[pd.DataFrame, pd.DataFrame]:
range(len(sensor_ids)), key=lambda k: order_index[sensor_ids[k]]
)
sorted_data_urls = [data_urls[i] for i in sorted_indices]
buffers = [BytesIO(b"") * len(sorted_data_urls)]

Check warning on line 61 in clouddrift/adapters/mosaic.py

View check run for this annotation

Codecov / codecov/patch

clouddrift/adapters/mosaic.py#L61

Added line #L61 was not covered by tests

with ThreadPoolExecutor() as executor:
dfs = tqdm(
executor.map(pd.read_csv, sorted_data_urls),
total=len(sorted_data_urls),
desc="Downloading data",
ncols=80,
)

download_with_progress(zip(sorted_data_urls, buffers), desc="Downloading data")
dfs = [pd.read_csv(b) for b in buffers]

Check warning on line 64 in clouddrift/adapters/mosaic.py

View check run for this annotation

Codecov / codecov/patch

clouddrift/adapters/mosaic.py#L63-L64

Added lines #L63 - L64 were not covered by tests
obs_df = pd.concat(dfs)

# Use the index of the concatenated DataFrame to determine the count/rowsize
Expand Down
11 changes: 3 additions & 8 deletions clouddrift/adapters/subsurface_floats.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@
import pandas as pd
import scipy.io
import tempfile
import urllib.request
import xarray as xr
import warnings

from clouddrift.adapters.utils import download_with_progress

SUBSURFACE_FLOATS_DATA_URL = (
"https://www.aoml.noaa.gov/phod/float_traj/files/allFloats_12122017.mat"
)
Expand All @@ -31,13 +32,7 @@


def download(file: str):
if not os.path.isfile(file):
print(
f"Downloading Subsurface float trajectories from {SUBSURFACE_FLOATS_DATA_URL} to {file}..."
)
urllib.request.urlretrieve(SUBSURFACE_FLOATS_DATA_URL, file)
else:
warnings.warn(f"{file} already exists; skip download.")
download_with_progress([(SUBSURFACE_FLOATS_DATA_URL, file)])


def to_xarray(
Expand Down
Loading

0 comments on commit c438051

Please sign in to comment.