Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

⚡ Source gdp adapter using dask #508

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
487 changes: 188 additions & 299 deletions clouddrift/adapters/gdp/gdpsource.py

Large diffs are not rendered by default.

35 changes: 28 additions & 7 deletions clouddrift/datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,6 @@ def gdp6h(decode_times: bool = True) -> xr.Dataset:
def gdp_source(
tmp_path: str = adapters.gdp_source._TMP_PATH,
max: int | None = None,
skip_download: bool = False,
use_fill_values: bool = True,
decode_times: bool = True,
) -> xr.Dataset:
Expand All @@ -167,9 +166,10 @@ def gdp_source(
filesystem. If it is not found, the dataset will be downloaded using the
corresponding adapter function and stored as zarr archive for later access.

The data is accessed from a public HTTPS server at NOAA's Atlantic
The drifter data is accessed from a public HTTPS server at NOAA's Atlantic
Oceanographic and Meteorological Laboratory (AOML) at
https://www.aoml.noaa.gov/ftp/pub/phod/pub/pazos/data/shane/sst/.
https://www.aoml.noaa.gov/ftp/pub/phod/pub/pazos/data/shane/sst/ while the metadata is
retrieved from https://www.aoml.noaa.gov/ftp/pub/phod/buoydata.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add something like:

"Metadata are obtained from url_of_metadata."

Parameters
----------
Expand All @@ -178,9 +178,6 @@ def gdp_source(
max: int, optional
Maximum number of files to retrieve and parse to generate the aggregate file. Mainly used
for testing purposes.
skip_download: bool, False (default)
If True, skips downloading the data files and the code assumes the files have already been downloaded.
This is mainly used to skip downloading files if the remote doesn't provide the HTTP Last-Modified header.
use_fill_values: bool, True (default)
When True, missing metadata fields are replaced with fill values. When False and no metadata
is found for a given drifter its observations are ignored.
Expand Down Expand Up @@ -219,13 +216,37 @@ def gdp_source(
start_lon (traj) float64 ...
voltage (obs) float32 ...
wmo_number (traj) int64 ...

If you would like to take advantage of all your CPU cores and speed along the ragged array generation process
utilize the following snippet of code before running this function.

>>> from dask.distributed import LocalCluster
>>> cluster = LocalCluster()
>>> client = cluster.get_client()
>>> client.dashboard_link # Copy the link below into your browser to monitor the process.
'http://127.0.0.1:.../status'

Sometimes you may need to configure the temporary directory used to store intermediary computations in cases
where the temporary directory used lies on a partition with limited space. Some unix environments are partitioned
where / (root) directory has limited space, typically ~30GBs and then the /home or /Users directory takes up
whatever else remains. Since the /tmp directory (typical directory used for temporary data and computations) lies
on the root directory it's limited by this partition size.

An easy way to get around this limitation is to configure the dask temp directory to a path that lies on a
partition with more memory. To figure this out you can use the standard `df` unix utility like so: `df -h`.
If you find a path with more memory you can configure dask to use that instead of the default path
like so (make sure to add this line before starting any computation):

>>> import dask.config as daskc
>>> daskc.set({"temporary-directory": "/home/ksantana/.clouddrift/tmp"})

"""
file_selection_label = "all" if max is None else f"first-{max}"
return _dataset_filecache(
f"gdpsource_agg_{file_selection_label}.zarr",
decode_times,
lambda: adapters.gdp_source.to_raggedarray(
tmp_path, skip_download, max, use_fill_values=use_fill_values
tmp_path, max, use_fill_values=use_fill_values
),
)

Expand Down
6 changes: 3 additions & 3 deletions clouddrift/raggedarray.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def from_awkward(
@classmethod
def from_files(
cls,
indices: list[int],
indices: list[int] | np.ndarray,
preprocess_func: Callable[[int], xr.Dataset],
name_coords: list,
name_meta: list = list(),
Expand Down Expand Up @@ -273,7 +273,7 @@ def from_xarray(

@staticmethod
def number_of_observations(
rowsize_func: Callable[[int], int], indices: list, **kwargs
rowsize_func: Callable[[int], int], indices: list | np.ndarray, **kwargs
) -> np.ndarray:
"""Iterate through the files and evaluate the number of observations.

Expand Down Expand Up @@ -343,7 +343,7 @@ def attributes(
@staticmethod
def allocate(
preprocess_func: Callable[[int], xr.Dataset],
indices: list,
indices: list | np.ndarray,
rowsize: list | np.ndarray | xr.DataArray,
name_coords: list,
name_meta: list,
Expand Down
3 changes: 1 addition & 2 deletions docs/datasets.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@ datasets. Currently available datasets are:
- :func:`clouddrift.datasets.gdp6h`: 6-hourly GDP data from a ragged-array
NetCDF file hosted by the public HTTPS server at
`NOAA's Atlantic Oceanographic and Meteorological Laboratory (AOML) <https://www.aoml.noaa.gov/phod/gdp/index.php>`_.
- :func:`clouddrift.datasets.gdp_source`: source GDP data without being pre-processed unlike
the 6-hourly/1-hourly datasets that are derived from it.
- :func:`clouddrift.datasets.gdp_source`: Source GDP data from which the 6-hourly and 1-hourly products are derived.
- :func:`clouddrift.datasets.glad`: 15-minute Grand LAgrangian Deployment (GLAD)
data produced by the Consortium for Advanced Research on Transport of
Hydrocarbon in the Environment (CARTHE) and hosted upstream at the `Gulf of
Expand Down
3 changes: 2 additions & 1 deletion environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ dependencies:
- python>=3.10
- numpy>=1.21.6
- xarray>=2023.5.0
- pandas>=1.3.4
- pandas>=2.0.0
- h5netcdf>=1.3.0
- netcdf4>=1.6.4
- pyarrow>=9.0.0
Expand All @@ -19,3 +19,4 @@ dependencies:
- scipy>=1.11.2
- zarr>=2.14.2
- tenacity>=8.2.3
- dask>=2024.5.0
5 changes: 3 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ dependencies = [
"scipy>=1.11.2",
"xarray>=2023.5.0",
"zarr>=2.14.2",
"tenacity>=8.2.3"
"tenacity>=8.2.3",
"dask>=2024.5.0"
]

[project.optional-dependencies]
Expand Down Expand Up @@ -64,7 +65,7 @@ Homepage = "https://github.com/Cloud-Drift/clouddrift"
Documentation = "https://cloud-drift.github.io/clouddrift"

[tool.ruff.lint]
ignore = ["E731", "E741"]
ignore = ["E731", "E741", "E712"]
select = ["E4", "E7", "E9", "F", "I"]

[tool.mypy]
Expand Down
47 changes: 0 additions & 47 deletions tests/adapters/gdp/source_integ_tests.py

This file was deleted.

16 changes: 16 additions & 0 deletions tests/datasets_tests.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import unittest

import numpy as np

import tests.utils as testutils
Expand All @@ -14,6 +16,20 @@ def test_gdp6h(self):
with datasets.gdp6h() as ds:
self.assertTrue(ds)

@unittest.skip(
"This test takes a really long time to execute and should only be executed locally"
)
def test_gdpsource(self):
with datasets.gdp_source(max=1) as ds:
self.assertTrue(ds is not None)

start_dt_diffs = np.diff(ds["start_date"].data, axis=0)
self.assertEqual(
np.all(start_dt_diffs.astype(np.int64) >= 0).T,
True,
"Drifter segments not ordered by start date",
)

def test_glad(self):
with datasets.glad() as ds:
self.assertTrue(ds)
Expand Down